Skip to main content

purple_ssh/
snippet.rs

1use std::io;
2use std::path::{Path, PathBuf};
3use std::process::{Command, ExitStatus, Stdio};
4
5use crate::fs_util;
6
7/// A saved command snippet.
8#[derive(Debug, Clone, PartialEq)]
9pub struct Snippet {
10    pub name: String,
11    pub command: String,
12    pub description: String,
13}
14
15/// Result of running a snippet on a host.
16pub struct SnippetResult {
17    pub status: ExitStatus,
18    pub stdout: String,
19    pub stderr: String,
20}
21
22/// Snippet storage backed by ~/.purple/snippets (INI-style).
23#[derive(Debug, Clone, Default)]
24pub struct SnippetStore {
25    pub snippets: Vec<Snippet>,
26    /// Override path for save(). None uses the default ~/.purple/snippets.
27    pub path_override: Option<PathBuf>,
28}
29
30fn config_path(paths: Option<&crate::runtime::env::Paths>) -> Option<PathBuf> {
31    paths.map(crate::runtime::env::Paths::snippets_dir)
32}
33
34impl SnippetStore {
35    /// Load snippets from `~/.purple/snippets`, resolved from the injected
36    /// `paths`. The resolved path is stored as `path_override` so a later
37    /// `save()` writes back to the same location without re-resolving.
38    /// Returns an empty store when the file does not exist (normal
39    /// first-use) or when no home directory is known.
40    pub fn load(paths: Option<&crate::runtime::env::Paths>) -> Self {
41        let path = match config_path(paths) {
42            Some(p) => p,
43            None => return Self::default(),
44        };
45        let content = match std::fs::read_to_string(&path) {
46            Ok(c) => c,
47            Err(e) if e.kind() == io::ErrorKind::NotFound => {
48                return Self {
49                    path_override: Some(path),
50                    ..Self::default()
51                };
52            }
53            Err(e) => {
54                log::warn!("[config] Could not read {}: {}", path.display(), e);
55                return Self {
56                    path_override: Some(path),
57                    ..Self::default()
58                };
59            }
60        };
61        Self {
62            path_override: Some(path),
63            ..Self::parse(&content)
64        }
65    }
66
67    /// Parse INI-style snippet config.
68    pub fn parse(content: &str) -> Self {
69        let mut snippets = Vec::new();
70        let mut current: Option<Snippet> = None;
71
72        for line in content.lines() {
73            let trimmed = line.trim();
74            if trimmed.is_empty() || trimmed.starts_with('#') {
75                continue;
76            }
77            if trimmed.starts_with('[') && trimmed.ends_with(']') {
78                if let Some(snippet) = current.take() {
79                    if !snippet.command.is_empty()
80                        && !snippets.iter().any(|s: &Snippet| s.name == snippet.name)
81                    {
82                        snippets.push(snippet);
83                    }
84                }
85                let name = trimmed[1..trimmed.len() - 1].trim().to_string();
86                if snippets.iter().any(|s| s.name == name) {
87                    current = None;
88                    continue;
89                }
90                current = Some(Snippet {
91                    name,
92                    command: String::new(),
93                    description: String::new(),
94                });
95            } else if let Some(ref mut snippet) = current {
96                if let Some((key, value)) = trimmed.split_once('=') {
97                    let key = key.trim();
98                    // Trim whitespace around key but preserve value content
99                    // (only trim leading whitespace after '=', not trailing)
100                    let value = value.trim_start().to_string();
101                    match key {
102                        "command" => snippet.command = value,
103                        "description" => snippet.description = value,
104                        _ => {}
105                    }
106                }
107            }
108        }
109        if let Some(snippet) = current {
110            if !snippet.command.is_empty() && !snippets.iter().any(|s| s.name == snippet.name) {
111                snippets.push(snippet);
112            }
113        }
114        Self {
115            snippets,
116            path_override: None,
117        }
118    }
119
120    /// Save snippets to ~/.purple/snippets (atomic write, chmod 600).
121    pub fn save(&self) -> io::Result<()> {
122        if crate::demo_flag::is_demo() {
123            return Ok(());
124        }
125        let Some(path) = self.path_override.clone() else {
126            return Err(io::Error::new(
127                io::ErrorKind::NotFound,
128                "Could not determine home directory",
129            ));
130        };
131
132        let mut content = String::new();
133        for (i, snippet) in self.snippets.iter().enumerate() {
134            if i > 0 {
135                content.push('\n');
136            }
137            content.push_str(&format!("[{}]\n", snippet.name));
138            content.push_str(&format!("command={}\n", snippet.command));
139            if !snippet.description.is_empty() {
140                content.push_str(&format!("description={}\n", snippet.description));
141            }
142        }
143
144        fs_util::atomic_write(&path, content.as_bytes())
145    }
146
147    /// Get a snippet by name.
148    pub fn get(&self, name: &str) -> Option<&Snippet> {
149        self.snippets.iter().find(|s| s.name == name)
150    }
151
152    /// Add or replace a snippet.
153    pub fn set(&mut self, snippet: Snippet) {
154        if let Some(existing) = self.snippets.iter_mut().find(|s| s.name == snippet.name) {
155            *existing = snippet;
156        } else {
157            self.snippets.push(snippet);
158        }
159    }
160
161    /// Remove a snippet by name.
162    pub fn remove(&mut self, name: &str) {
163        self.snippets.retain(|s| s.name != name);
164    }
165}
166
167/// Validate a snippet name: non-empty, no leading/trailing whitespace,
168/// no `#`, no `[`, no `]`, no control characters.
169pub fn validate_name(name: &str) -> Result<(), String> {
170    if name.trim().is_empty() {
171        return Err(crate::messages::SNIPPET_NAME_EMPTY.to_string());
172    }
173    if name != name.trim() {
174        return Err(crate::messages::SNIPPET_NAME_WHITESPACE.to_string());
175    }
176    if name.contains('#') || name.contains('[') || name.contains(']') {
177        return Err(crate::messages::SNIPPET_NAME_INVALID_CHARS.to_string());
178    }
179    if name.contains(|c: char| c.is_control()) {
180        return Err(crate::messages::SNIPPET_NAME_CONTROL_CHARS.to_string());
181    }
182    Ok(())
183}
184
185/// Validate a snippet command: non-empty, no control characters (except tab).
186pub fn validate_command(command: &str) -> Result<(), String> {
187    if command.trim().is_empty() {
188        return Err(crate::messages::SNIPPET_COMMAND_EMPTY.to_string());
189    }
190    if command.contains(|c: char| c.is_control() && c != '\t') {
191        return Err(crate::messages::SNIPPET_COMMAND_CONTROL_CHARS.to_string());
192    }
193    Ok(())
194}
195
196// =========================================================================
197// Parameter support
198// =========================================================================
199
200/// A parameter found in a snippet command template.
201#[derive(Debug, Clone, PartialEq)]
202pub struct SnippetParam {
203    pub name: String,
204    pub default: Option<String>,
205}
206
207/// Shell-escape a string with single quotes (POSIX).
208/// Internal single quotes are escaped as `'\''`.
209pub fn shell_escape(s: &str) -> String {
210    format!("'{}'", s.replace('\'', "'\\''"))
211}
212
213/// Parse `{{name}}` and `{{name:default}}` from a command string.
214/// Returns params in order of first appearance, deduplicated. Max 20 params.
215pub fn parse_params(command: &str) -> Vec<SnippetParam> {
216    let mut params = Vec::new();
217    let mut seen = std::collections::HashSet::new();
218    let bytes = command.as_bytes();
219    let len = bytes.len();
220    let mut i = 0;
221    while i + 3 < len {
222        if bytes[i] == b'{' && bytes.get(i + 1) == Some(&b'{') {
223            if let Some(end) = command[i + 2..].find("}}") {
224                let inner = &command[i + 2..i + 2 + end];
225                let (name, default) = if let Some((n, d)) = inner.split_once(':') {
226                    (n.to_string(), Some(d.to_string()))
227                } else {
228                    (inner.to_string(), None)
229                };
230                if validate_param_name(&name).is_ok() && !seen.contains(&name) && params.len() < 20
231                {
232                    seen.insert(name.clone());
233                    params.push(SnippetParam { name, default });
234                }
235                i = i + 2 + end + 2;
236                continue;
237            }
238        }
239        i += 1;
240    }
241    params
242}
243
244/// Validate a parameter name: non-empty, alphanumeric/underscore/hyphen only.
245/// Rejects `{`, `}`, `'`, whitespace and control chars.
246pub fn validate_param_name(name: &str) -> Result<(), String> {
247    if name.is_empty() {
248        return Err("Parameter name cannot be empty.".to_string());
249    }
250    if !name
251        .chars()
252        .all(|c| c.is_alphanumeric() || c == '_' || c == '-')
253    {
254        return Err(format!(
255            "Parameter name '{}' contains invalid characters.",
256            name
257        ));
258    }
259    Ok(())
260}
261
262/// Substitute parameters into a command template (single-pass).
263/// All values (user-provided and defaults) are shell-escaped.
264pub fn substitute_params(
265    command: &str,
266    values: &std::collections::HashMap<String, String>,
267) -> String {
268    let mut result = String::with_capacity(command.len());
269    let bytes = command.as_bytes();
270    let len = bytes.len();
271    let mut i = 0;
272    while i < len {
273        if i + 3 < len && bytes[i] == b'{' && bytes[i + 1] == b'{' {
274            if let Some(end) = command[i + 2..].find("}}") {
275                let inner = &command[i + 2..i + 2 + end];
276                let (name, default) = if let Some((n, d)) = inner.split_once(':') {
277                    (n, Some(d))
278                } else {
279                    (inner, None)
280                };
281                let value = values
282                    .get(name)
283                    .filter(|v| !v.is_empty())
284                    .map(|v| v.as_str())
285                    .or(default)
286                    .unwrap_or("");
287                result.push_str(&shell_escape(value));
288                i = i + 2 + end + 2;
289                continue;
290            }
291        }
292        // Properly decode UTF-8 character (not byte-level cast)
293        let ch = command[i..].chars().next().unwrap();
294        result.push(ch);
295        i += ch.len_utf8();
296    }
297    result
298}
299
300// =========================================================================
301// Output sanitization
302// =========================================================================
303
304/// Strip ANSI escape sequences and C1 control codes from output.
305/// Handles CSI, OSC, DCS, SOS, PM and APC sequences plus the C1 range 0x80-0x9F.
306pub fn sanitize_output(input: &str) -> String {
307    let mut out = String::with_capacity(input.len());
308    let mut chars = input.chars().peekable();
309    while let Some(c) = chars.next() {
310        match c {
311            '\x1b' => {
312                match chars.peek() {
313                    Some('[') => {
314                        chars.next();
315                        // CSI: consume until 0x40-0x7E
316                        while let Some(&ch) = chars.peek() {
317                            chars.next();
318                            if ('\x40'..='\x7e').contains(&ch) {
319                                break;
320                            }
321                        }
322                    }
323                    Some(']') | Some('P') | Some('X') | Some('^') | Some('_') => {
324                        chars.next();
325                        // OSC/DCS/SOS/PM/APC: consume until ST (ESC\) or BEL
326                        consume_until_st(&mut chars);
327                    }
328                    _ => {
329                        // Single ESC + one char
330                        chars.next();
331                    }
332                }
333            }
334            c if ('\u{0080}'..='\u{009F}').contains(&c) => {
335                // C1 control codes: skip
336            }
337            c if c.is_control() && c != '\n' && c != '\t' => {
338                // Other control chars (except newline/tab): skip
339            }
340            _ => out.push(c),
341        }
342    }
343    out
344}
345
346/// Consume chars until String Terminator (ESC\) or BEL (\x07).
347fn consume_until_st(chars: &mut std::iter::Peekable<std::str::Chars<'_>>) {
348    while let Some(&ch) = chars.peek() {
349        if ch == '\x07' {
350            chars.next();
351            break;
352        }
353        if ch == '\x1b' {
354            chars.next();
355            if chars.peek() == Some(&'\\') {
356                chars.next();
357            }
358            break;
359        }
360        chars.next();
361    }
362}
363
364// =========================================================================
365// Background snippet execution
366// =========================================================================
367
368/// Maximum lines stored per host. Reader continues draining beyond this
369/// to prevent child from blocking on a full pipe buffer.
370const MAX_OUTPUT_LINES: usize = 10_000;
371
372/// RAII guard that kills the process group on drop.
373/// Uses SIGTERM first, then escalates to SIGKILL after a brief wait.
374pub struct ChildGuard {
375    inner: std::sync::Mutex<Option<std::process::Child>>,
376    pgid: i32,
377}
378
379impl ChildGuard {
380    fn new(child: std::process::Child) -> Self {
381        // i32::try_from avoids silent overflow for PIDs > i32::MAX.
382        // Fallback -1 makes killpg a harmless no-op on overflow.
383        // In practice Linux caps PIDs well below i32::MAX.
384        let pgid = i32::try_from(child.id()).unwrap_or(-1);
385        Self {
386            inner: std::sync::Mutex::new(Some(child)),
387            pgid,
388        }
389    }
390}
391
392impl Drop for ChildGuard {
393    fn drop(&mut self) {
394        let mut lock = self.inner.lock().unwrap_or_else(|e| e.into_inner());
395        if let Some(ref mut child) = *lock {
396            // Already exited? Skip kill entirely (PID may be recycled).
397            if let Ok(Some(_)) = child.try_wait() {
398                return;
399            }
400            // SAFETY: self.pgid was set by setpgid(0,0) in pre_exec and is
401            // valid for the lifetime of this SnippetChild. kill() with a
402            // negative PID sends the signal to the entire process group.
403            // ESRCH (process already exited) is the expected race; the
404            // return value is intentionally ignored.
405            #[cfg(unix)]
406            unsafe {
407                libc::kill(-self.pgid, libc::SIGTERM);
408            }
409            // Poll for up to 500ms
410            let deadline = std::time::Instant::now() + std::time::Duration::from_millis(500);
411            loop {
412                if let Ok(Some(_)) = child.try_wait() {
413                    return;
414                }
415                if std::time::Instant::now() >= deadline {
416                    break;
417                }
418                std::thread::sleep(std::time::Duration::from_millis(50));
419            }
420            // SAFETY: same invariants as the SIGTERM call above.
421            #[cfg(unix)]
422            unsafe {
423                libc::kill(-self.pgid, libc::SIGKILL);
424            }
425            // Fallback: direct kill in case setpgid failed in pre_exec
426            let _ = child.kill();
427            let _ = child.wait();
428        }
429    }
430}
431
432/// Read lines from a pipe. Stores up to `MAX_OUTPUT_LINES` but continues
433/// draining the pipe after that to prevent the child from blocking.
434fn read_pipe_capped<R: io::Read>(reader: R) -> String {
435    use io::BufRead;
436    let mut reader = io::BufReader::new(reader);
437    let mut output = String::new();
438    let mut line_count = 0;
439    let mut capped = false;
440    let mut buf = Vec::new();
441    loop {
442        buf.clear();
443        match reader.read_until(b'\n', &mut buf) {
444            Ok(0) => break, // EOF
445            Ok(_) => {
446                if !capped {
447                    if line_count < MAX_OUTPUT_LINES {
448                        if line_count > 0 {
449                            output.push('\n');
450                        }
451                        // Strip trailing newline (and \r for CRLF)
452                        if buf.last() == Some(&b'\n') {
453                            buf.pop();
454                            if buf.last() == Some(&b'\r') {
455                                buf.pop();
456                            }
457                        }
458                        // Lossy conversion handles non-UTF-8 output
459                        output.push_str(&String::from_utf8_lossy(&buf));
460                        line_count += 1;
461                    } else {
462                        output.push_str("\n[Output truncated at 10,000 lines]");
463                        capped = true;
464                    }
465                }
466                // If capped, keep reading but discard to drain the pipe
467            }
468            Err(_) => break,
469        }
470    }
471    output
472}
473
474/// Build the base SSH command with shared options for snippet execution.
475/// Sets -F, ConnectTimeout, ControlMaster/ControlPath and ClearAllForwardings.
476/// Also configures askpass and Bitwarden session env vars.
477///
478/// When `non_interactive` is true, adds `-o StrictHostKeyChecking=yes` so an
479/// unknown host returns an error instead of writing a prompt to the controlling
480/// tty. Background fetches (container listings, file browser listings, captured
481/// snippet output) pass `true`. Direct CLI use passes `false` so users retain
482/// normal host-key trust-on-first-use behaviour.
483fn base_ssh_command(
484    alias: &str,
485    config_path: &Path,
486    command: &str,
487    askpass: Option<&str>,
488    bw_session: Option<&str>,
489    has_active_tunnel: bool,
490    non_interactive: bool,
491) -> Command {
492    let mut cmd = Command::new("ssh");
493    cmd.arg("-F")
494        .arg(config_path)
495        .arg("-o")
496        .arg("ConnectTimeout=10")
497        .arg("-o")
498        .arg("ControlMaster=no")
499        .arg("-o")
500        .arg("ControlPath=none");
501
502    if non_interactive {
503        cmd.arg("-o").arg("StrictHostKeyChecking=yes");
504    }
505
506    if has_active_tunnel {
507        cmd.arg("-o").arg("ClearAllForwardings=yes");
508    }
509
510    cmd.arg("--").arg(alias).arg(command);
511
512    if askpass.is_some() {
513        crate::askpass_env::configure_ssh_command(&mut cmd, alias, config_path);
514    }
515
516    if let Some(token) = bw_session {
517        cmd.env("BW_SESSION", token);
518    }
519
520    cmd
521}
522
523/// Build the SSH Command for a snippet execution with piped I/O.
524fn build_snippet_command(
525    alias: &str,
526    config_path: &Path,
527    command: &str,
528    askpass: Option<&str>,
529    bw_session: Option<&str>,
530    has_active_tunnel: bool,
531) -> Command {
532    let mut cmd = base_ssh_command(
533        alias,
534        config_path,
535        command,
536        askpass,
537        bw_session,
538        has_active_tunnel,
539        true,
540    );
541    cmd.stdin(Stdio::null())
542        .stdout(Stdio::piped())
543        .stderr(Stdio::piped());
544
545    // Isolate child into its own process group so we can kill the
546    // entire tree without affecting purple itself.
547    #[cfg(unix)]
548    // SAFETY: the pre-fork callback runs between fork and the exec syscall in
549    // the child; only async-signal-safe calls are permitted. `setpgid(0, 0)`
550    // is async-signal-safe per POSIX and does not touch Rust runtime state.
551    unsafe {
552        use std::os::unix::process::CommandExt;
553        cmd.pre_exec(|| {
554            libc::setpgid(0, 0);
555            Ok(())
556        });
557    }
558
559    cmd
560}
561
562/// Execute a single host: spawn SSH, read output, wait, send result.
563fn execute_host(
564    run_id: u64,
565    ctx: &crate::ssh_context::SshContext<'_>,
566    command: &str,
567    tx: &std::sync::mpsc::Sender<crate::event::AppEvent>,
568) -> Option<std::sync::Arc<ChildGuard>> {
569    let alias = ctx.alias;
570    let mut cmd = build_snippet_command(
571        alias,
572        ctx.config_path,
573        command,
574        ctx.askpass,
575        ctx.bw_session,
576        ctx.has_tunnel,
577    );
578
579    match cmd.spawn() {
580        Ok(child) => {
581            let guard = std::sync::Arc::new(ChildGuard::new(child));
582
583            // Take stdout/stderr BEFORE wait to avoid pipe deadlock
584            let stdout_pipe = {
585                let mut lock = guard.inner.lock().unwrap_or_else(|e| e.into_inner());
586                lock.as_mut().and_then(|c| c.stdout.take())
587            };
588            let stderr_pipe = {
589                let mut lock = guard.inner.lock().unwrap_or_else(|e| e.into_inner());
590                lock.as_mut().and_then(|c| c.stderr.take())
591            };
592
593            // Spawn reader threads
594            let stdout_handle = std::thread::spawn(move || match stdout_pipe {
595                Some(pipe) => read_pipe_capped(pipe),
596                None => String::new(),
597            });
598            let stderr_handle = std::thread::spawn(move || match stderr_pipe {
599                Some(pipe) => read_pipe_capped(pipe),
600                None => String::new(),
601            });
602
603            // Join readers BEFORE wait to guarantee all output is received
604            let stdout_text = stdout_handle.join().unwrap_or_else(|_| {
605                log::warn!("[purple] Snippet stdout reader thread panicked");
606                String::new()
607            });
608            let stderr_text = stderr_handle.join().unwrap_or_else(|_| {
609                log::warn!("[purple] Snippet stderr reader thread panicked");
610                String::new()
611            });
612
613            // Now wait for the child to exit, then take it out of the
614            // guard so Drop won't kill a potentially recycled PID.
615            let exit_code = {
616                let mut lock = guard.inner.lock().unwrap_or_else(|e| e.into_inner());
617                let status = lock.as_mut().and_then(|c| c.wait().ok());
618                let _ = lock.take(); // Prevent ChildGuard::drop from killing recycled PID
619                status.and_then(|s| {
620                    #[cfg(unix)]
621                    {
622                        use std::os::unix::process::ExitStatusExt;
623                        s.code().or_else(|| s.signal().map(|sig| 128 + sig))
624                    }
625                    #[cfg(not(unix))]
626                    {
627                        s.code()
628                    }
629                })
630            };
631
632            let _ = tx.send(crate::event::AppEvent::SnippetHostDone {
633                run_id,
634                alias: alias.to_string(),
635                stdout: sanitize_output(&stdout_text),
636                stderr: sanitize_output(&stderr_text),
637                exit_code,
638            });
639
640            Some(guard)
641        }
642        Err(e) => {
643            log::warn!(
644                "[external] snippet ssh spawn failed: run_id={} alias={} err={}",
645                run_id,
646                alias,
647                e
648            );
649            let _ = tx.send(crate::event::AppEvent::SnippetHostDone {
650                run_id,
651                alias: alias.to_string(),
652                stdout: String::new(),
653                stderr: crate::messages::snippet_ssh_launch_failed(&e),
654                exit_code: None,
655            });
656            None
657        }
658    }
659}
660
661/// Spawn background snippet execution on multiple hosts.
662/// The coordinator thread drives sequential or parallel host iteration.
663#[allow(clippy::too_many_arguments)]
664pub fn spawn_snippet_execution(
665    run_id: u64,
666    askpass_map: Vec<(String, Option<String>)>,
667    config_path: PathBuf,
668    env: std::sync::Arc<crate::runtime::env::Env>,
669    command: String,
670    bw_session: Option<String>,
671    tunnel_aliases: std::collections::HashSet<String>,
672    cancel: std::sync::Arc<std::sync::atomic::AtomicBool>,
673    tx: std::sync::mpsc::Sender<crate::event::AppEvent>,
674    parallel: bool,
675) {
676    let total = askpass_map.len();
677    let max_concurrent: usize = 20;
678
679    std::thread::Builder::new()
680        .name("snippet-coordinator".into())
681        .spawn(move || {
682            let guards: std::sync::Arc<std::sync::Mutex<Vec<std::sync::Arc<ChildGuard>>>> =
683                std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
684
685            if parallel && total > 1 {
686                // Slot-based semaphore for concurrency limiting
687                let (slot_tx, slot_rx) = std::sync::mpsc::channel::<()>();
688                for _ in 0..max_concurrent.min(total) {
689                    let _ = slot_tx.send(());
690                }
691
692                let completed = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
693                let mut worker_handles = Vec::new();
694
695                for (alias, askpass) in askpass_map {
696                    if cancel.load(std::sync::atomic::Ordering::Relaxed) {
697                        break;
698                    }
699
700                    // Wait for a slot, checking cancel periodically
701                    loop {
702                        match slot_rx.recv_timeout(std::time::Duration::from_millis(100)) {
703                            Ok(()) => break,
704                            Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
705                                if cancel.load(std::sync::atomic::Ordering::Relaxed) {
706                                    break;
707                                }
708                            }
709                            Err(_) => break, // channel closed
710                        }
711                    }
712
713                    if cancel.load(std::sync::atomic::Ordering::Relaxed) {
714                        break;
715                    }
716
717                    let config_path = config_path.clone();
718                    let env = std::sync::Arc::clone(&env);
719                    let command = command.clone();
720                    let bw_session = bw_session.clone();
721                    let has_tunnel = tunnel_aliases.contains(&alias);
722                    let tx = tx.clone();
723                    let slot_tx = slot_tx.clone();
724                    let guards = guards.clone();
725                    let completed = completed.clone();
726                    let total = total;
727
728                    let handle = std::thread::spawn(move || {
729                        // RAII guard: release semaphore slot even on panic
730                        struct SlotRelease(Option<std::sync::mpsc::Sender<()>>);
731                        impl Drop for SlotRelease {
732                            fn drop(&mut self) {
733                                if let Some(tx) = self.0.take() {
734                                    let _ = tx.send(());
735                                }
736                            }
737                        }
738                        let _slot = SlotRelease(Some(slot_tx));
739
740                        let host_ctx = crate::ssh_context::SshContext {
741                            alias: &alias,
742                            config_path: &config_path,
743                            askpass: askpass.as_deref(),
744                            bw_session: bw_session.as_deref(),
745                            has_tunnel,
746                            env: &env,
747                        };
748                        let guard = execute_host(run_id, &host_ctx, &command, &tx);
749
750                        // Insert guard BEFORE checking cancel so it can be cleaned up
751                        if let Some(g) = guard {
752                            guards.lock().unwrap_or_else(|e| e.into_inner()).push(g);
753                        }
754
755                        let c = completed.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
756                        let _ = tx.send(crate::event::AppEvent::SnippetProgress {
757                            run_id,
758                            completed: c,
759                            total,
760                        });
761                        // _slot dropped here, releasing semaphore
762                    });
763                    worker_handles.push(handle);
764                }
765
766                // Wait for all workers to finish
767                for handle in worker_handles {
768                    let _ = handle.join();
769                }
770            } else {
771                // Sequential execution
772                for (i, (alias, askpass)) in askpass_map.into_iter().enumerate() {
773                    if cancel.load(std::sync::atomic::Ordering::Relaxed) {
774                        break;
775                    }
776
777                    let has_tunnel = tunnel_aliases.contains(&alias);
778                    let host_ctx = crate::ssh_context::SshContext {
779                        alias: &alias,
780                        config_path: &config_path,
781                        askpass: askpass.as_deref(),
782                        bw_session: bw_session.as_deref(),
783                        has_tunnel,
784                        env: &env,
785                    };
786                    let guard = execute_host(run_id, &host_ctx, &command, &tx);
787
788                    if let Some(g) = guard {
789                        guards.lock().unwrap_or_else(|e| e.into_inner()).push(g);
790                    }
791
792                    let _ = tx.send(crate::event::AppEvent::SnippetProgress {
793                        run_id,
794                        completed: i + 1,
795                        total,
796                    });
797                }
798            }
799
800            let _ = tx.send(crate::event::AppEvent::SnippetAllDone { run_id });
801            // Guards dropped here, cleaning up any remaining children
802        })
803        .expect("failed to spawn snippet coordinator");
804}
805
806/// Run a snippet on a single host via SSH.
807/// When `capture` is true, stdout/stderr are piped and returned in the result.
808/// When `capture` is false, stdout/stderr are inherited (streamed to terminal
809/// in real-time) and the returned strings are empty.
810#[allow(clippy::too_many_arguments)]
811pub fn run_snippet(
812    alias: &str,
813    config_path: &Path,
814    env: &crate::runtime::env::Env,
815    command: &str,
816    askpass: Option<&str>,
817    bw_session: Option<&str>,
818    capture: bool,
819    has_active_tunnel: bool,
820) -> anyhow::Result<SnippetResult> {
821    // Renew the Vault SSH cert before connecting so container listing,
822    // inspect, logs, actions and file-browser operations get a fresh cert
823    // just like the interactive connect path does. No-op for non-vault hosts.
824    crate::runtime::helpers::ensure_vault_cert_for_alias(env, alias, config_path);
825
826    let mut cmd = base_ssh_command(
827        alias,
828        config_path,
829        command,
830        askpass,
831        bw_session,
832        has_active_tunnel,
833        capture,
834    );
835
836    if capture {
837        cmd.stdin(Stdio::null())
838            .stdout(Stdio::piped())
839            .stderr(Stdio::piped());
840    } else {
841        cmd.stdin(Stdio::inherit())
842            .stdout(Stdio::inherit())
843            .stderr(Stdio::inherit());
844    }
845
846    if capture {
847        let (status, stdout, stderr) = run_with_bounded_output(&mut cmd, alias)?;
848        Ok(SnippetResult {
849            status,
850            stdout,
851            stderr,
852        })
853    } else {
854        let status = cmd
855            .status()
856            .map_err(|e| anyhow::anyhow!("Failed to run ssh for '{}': {}", alias, e))?;
857
858        Ok(SnippetResult {
859            status,
860            stdout: String::new(),
861            stderr: String::new(),
862        })
863    }
864}
865
866/// Hard cap on stdout/stderr captured from an SSH child process.
867/// A hostile or malfunctioning remote can stream unbounded output and
868/// blow up purple's memory. 16 MB is far above any legitimate
869/// `docker inspect` / `docker logs --tail 200` / `ps -a` output (which
870/// peak at hundreds of KB) and below what would meaningfully stress
871/// the parse pipelines or terminal buffer.
872pub const SSH_OUTPUT_MAX_BYTES: usize = 16 * 1024 * 1024;
873
874/// Spawn a child with piped stdout/stderr and read each pipe with a
875/// hard byte cap. Once the cap is hit the remaining bytes are drained
876/// to a sink (so the child can exit cleanly) and a debug log records
877/// the truncation. Returns the captured payload as lossy-UTF8 Strings
878/// alongside the child's exit status.
879fn run_with_bounded_output(
880    cmd: &mut Command,
881    alias: &str,
882) -> anyhow::Result<(ExitStatus, String, String)> {
883    let mut child = cmd
884        .spawn()
885        .map_err(|e| anyhow::anyhow!("Failed to spawn ssh for '{}': {}", alias, e))?;
886    let stdout = child.stdout.take();
887    let stderr = child.stderr.take();
888
889    let alias_for_stdout = alias.to_string();
890    let stdout_handle = std::thread::spawn(move || match stdout {
891        Some(mut pipe) => {
892            read_bounded(&mut pipe, SSH_OUTPUT_MAX_BYTES, &alias_for_stdout, "stdout")
893        }
894        None => Vec::new(),
895    });
896    let alias_for_stderr = alias.to_string();
897    let stderr_handle = std::thread::spawn(move || match stderr {
898        Some(mut pipe) => {
899            read_bounded(&mut pipe, SSH_OUTPUT_MAX_BYTES, &alias_for_stderr, "stderr")
900        }
901        None => Vec::new(),
902    });
903
904    // Join the drain threads BEFORE waiting on the child. The threads
905    // own the pipe read-ends; when they finish (cap hit or EOF) the
906    // reader handle drops and closes its side of the pipe, which
907    // unblocks any pending write on the remote child. Waiting on the
908    // child first would deadlock the moment stdout exceeded the kernel
909    // pipe buffer (typically 64 KB on Linux): the child blocks on
910    // write, the parent blocks on wait, neither thread runs.
911    let stdout_bytes = stdout_handle.join().unwrap_or_default();
912    let stderr_bytes = stderr_handle.join().unwrap_or_default();
913    let status = child
914        .wait()
915        .map_err(|e| anyhow::anyhow!("ssh wait failed for '{}': {}", alias, e))?;
916
917    Ok((
918        status,
919        String::from_utf8_lossy(&stdout_bytes).to_string(),
920        String::from_utf8_lossy(&stderr_bytes).to_string(),
921    ))
922}
923
924fn read_bounded<R: std::io::Read>(
925    reader: &mut R,
926    max: usize,
927    alias: &str,
928    stream: &str,
929) -> Vec<u8> {
930    let mut out = Vec::with_capacity(8 * 1024);
931    let mut chunk = [0u8; 8 * 1024];
932    loop {
933        match reader.read(&mut chunk) {
934            Ok(0) => break,
935            Ok(n) => {
936                if out.len() + n > max {
937                    let allowed = max.saturating_sub(out.len());
938                    out.extend_from_slice(&chunk[..allowed]);
939                    log::warn!(
940                        "[external] ssh {} for '{}' exceeded {} bytes; truncating remainder",
941                        stream,
942                        alias,
943                        max
944                    );
945                    // Drain remaining bytes so the child can exit cleanly
946                    // instead of blocking on a backpressured pipe.
947                    let _ = std::io::copy(reader, &mut std::io::sink());
948                    break;
949                }
950                out.extend_from_slice(&chunk[..n]);
951            }
952            Err(e) => {
953                log::debug!("[external] ssh {stream} read error for '{alias}': {e}");
954                break;
955            }
956        }
957    }
958    out
959}
960
961#[cfg(test)]
962#[path = "snippet_tests.rs"]
963mod tests;