Skip to main content

purple_ssh/
snippet.rs

1use std::collections::HashMap;
2use std::io;
3use std::path::{Path, PathBuf};
4use std::process::{Command, ExitStatus, Stdio};
5
6use crate::fs_util;
7
8/// A saved command snippet.
9#[derive(Debug, Clone, PartialEq)]
10pub struct Snippet {
11    pub name: String,
12    pub command: String,
13    pub description: String,
14}
15
16/// Result of running a snippet on a host.
17pub struct SnippetResult {
18    pub status: ExitStatus,
19    pub stdout: String,
20    pub stderr: String,
21}
22
23/// Snippet storage backed by ~/.purple/snippets (INI-style).
24#[derive(Debug, Clone, Default)]
25pub struct SnippetStore {
26    pub snippets: Vec<Snippet>,
27    /// Override path for save(). None uses the default ~/.purple/snippets.
28    pub path_override: Option<PathBuf>,
29    /// Default target host aliases per snippet name, persisted as the optional
30    /// `hosts=` line. Pre-selected when the run picker opens. Kept out of the
31    /// `Snippet` struct so existing call sites that build snippets stay
32    /// untouched; the association is by name.
33    pub targets: HashMap<String, Vec<String>>,
34}
35
36fn config_path(paths: Option<&crate::runtime::env::Paths>) -> Option<PathBuf> {
37    paths.map(crate::runtime::env::Paths::snippets_dir)
38}
39
40impl SnippetStore {
41    /// Load snippets from `~/.purple/snippets`, resolved from the injected
42    /// `paths`. The resolved path is stored as `path_override` so a later
43    /// `save()` writes back to the same location without re-resolving.
44    /// Returns an empty store when the file does not exist (normal
45    /// first-use) or when no home directory is known.
46    pub fn load(paths: Option<&crate::runtime::env::Paths>) -> Self {
47        let path = match config_path(paths) {
48            Some(p) => p,
49            None => return Self::default(),
50        };
51        let content = match std::fs::read_to_string(&path) {
52            Ok(c) => c,
53            Err(e) if e.kind() == io::ErrorKind::NotFound => {
54                return Self {
55                    path_override: Some(path),
56                    ..Self::default()
57                };
58            }
59            Err(e) => {
60                log::warn!("[config] Could not read {}: {}", path.display(), e);
61                return Self {
62                    path_override: Some(path),
63                    ..Self::default()
64                };
65            }
66        };
67        Self {
68            path_override: Some(path),
69            ..Self::parse(&content)
70        }
71    }
72
73    /// Parse INI-style snippet config.
74    pub fn parse(content: &str) -> Self {
75        let mut snippets = Vec::new();
76        let mut targets: HashMap<String, Vec<String>> = HashMap::new();
77        let mut current: Option<Snippet> = None;
78
79        for line in content.lines() {
80            let trimmed = line.trim();
81            if trimmed.is_empty() || trimmed.starts_with('#') {
82                continue;
83            }
84            if trimmed.starts_with('[') && trimmed.ends_with(']') {
85                if let Some(snippet) = current.take() {
86                    if !snippet.command.is_empty()
87                        && !snippets.iter().any(|s: &Snippet| s.name == snippet.name)
88                    {
89                        snippets.push(snippet);
90                    }
91                }
92                let name = trimmed[1..trimmed.len() - 1].trim().to_string();
93                if snippets.iter().any(|s| s.name == name) {
94                    current = None;
95                    continue;
96                }
97                current = Some(Snippet {
98                    name,
99                    command: String::new(),
100                    description: String::new(),
101                });
102            } else if let Some(ref mut snippet) = current {
103                if let Some((key, value)) = trimmed.split_once('=') {
104                    let key = key.trim();
105                    // Trim whitespace around key but preserve value content
106                    // (only trim leading whitespace after '=', not trailing)
107                    let value = value.trim_start().to_string();
108                    match key {
109                        "command" => snippet.command = value,
110                        "description" => snippet.description = value,
111                        "hosts" => {
112                            let aliases: Vec<String> = value
113                                .split(',')
114                                .map(|a| unescape_alias(a.trim()))
115                                .filter(|a| !a.is_empty())
116                                .collect();
117                            if !aliases.is_empty() {
118                                targets.insert(snippet.name.clone(), aliases);
119                            }
120                        }
121                        _ => {}
122                    }
123                }
124            }
125        }
126        if let Some(snippet) = current {
127            if !snippet.command.is_empty() && !snippets.iter().any(|s| s.name == snippet.name) {
128                snippets.push(snippet);
129            }
130        }
131        // Drop targets for sections that did not yield a snippet (no command,
132        // or a duplicate name) so the map never holds orphans.
133        targets.retain(|name, _| snippets.iter().any(|s| &s.name == name));
134        Self {
135            snippets,
136            path_override: None,
137            targets,
138        }
139    }
140
141    /// Save snippets to ~/.purple/snippets (atomic write, chmod 600).
142    pub fn save(&self) -> io::Result<()> {
143        if crate::demo_flag::is_demo() {
144            return Ok(());
145        }
146        let Some(path) = self.path_override.clone() else {
147            return Err(io::Error::new(
148                io::ErrorKind::NotFound,
149                "Could not determine home directory",
150            ));
151        };
152
153        let mut content = String::new();
154        for (i, snippet) in self.snippets.iter().enumerate() {
155            if i > 0 {
156                content.push('\n');
157            }
158            content.push_str(&format!("[{}]\n", snippet.name));
159            content.push_str(&format!("command={}\n", snippet.command));
160            if !snippet.description.is_empty() {
161                content.push_str(&format!("description={}\n", snippet.description));
162            }
163            if let Some(hosts) = self.targets.get(&snippet.name) {
164                if !hosts.is_empty() {
165                    let joined = hosts
166                        .iter()
167                        .map(|a| escape_alias(a))
168                        .collect::<Vec<_>>()
169                        .join(",");
170                    content.push_str(&format!("hosts={joined}\n"));
171                }
172            }
173        }
174
175        fs_util::atomic_write(&path, content.as_bytes())
176    }
177
178    /// Default target host aliases saved for `name` (empty when none).
179    pub fn targets_for(&self, name: &str) -> &[String] {
180        self.targets.get(name).map(|v| v.as_slice()).unwrap_or(&[])
181    }
182
183    /// Set (or clear, when empty) the default target host aliases for `name`.
184    pub fn set_targets(&mut self, name: &str, aliases: Vec<String>) {
185        if aliases.is_empty() {
186            self.targets.remove(name);
187        } else {
188            self.targets.insert(name.to_string(), aliases);
189        }
190    }
191
192    /// Get a snippet by name.
193    pub fn get(&self, name: &str) -> Option<&Snippet> {
194        self.snippets.iter().find(|s| s.name == name)
195    }
196
197    /// Add or replace a snippet.
198    pub fn set(&mut self, snippet: Snippet) {
199        if let Some(existing) = self.snippets.iter_mut().find(|s| s.name == snippet.name) {
200            *existing = snippet;
201        } else {
202            self.snippets.push(snippet);
203        }
204    }
205
206    /// Remove a snippet by name, dropping its saved targets too.
207    pub fn remove(&mut self, name: &str) {
208        self.snippets.retain(|s| s.name != name);
209        self.targets.remove(name);
210    }
211}
212
213/// Percent-encode the comma (and the percent sign itself) in a host alias so it
214/// survives the comma-joined `hosts=` line. A host whose SSH `Host` line uses a
215/// comma (e.g. `Host web1,web2`) is a single concrete alias containing a comma;
216/// without escaping it would shred into phantom aliases on reload.
217fn escape_alias(alias: &str) -> String {
218    alias.replace('%', "%25").replace(',', "%2C")
219}
220
221/// Inverse of [`escape_alias`]. Unescape order is the reverse of escape order so
222/// a literal `%2C` in an alias round-trips intact.
223fn unescape_alias(token: &str) -> String {
224    token.replace("%2C", ",").replace("%25", "%")
225}
226
227/// Validate a snippet name: non-empty, no leading/trailing whitespace,
228/// no `#`, no `[`, no `]`, no control characters.
229pub fn validate_name(name: &str) -> Result<(), String> {
230    if name.trim().is_empty() {
231        return Err(crate::messages::SNIPPET_NAME_EMPTY.to_string());
232    }
233    if name != name.trim() {
234        return Err(crate::messages::SNIPPET_NAME_WHITESPACE.to_string());
235    }
236    if name.contains('#') || name.contains('[') || name.contains(']') {
237        return Err(crate::messages::SNIPPET_NAME_INVALID_CHARS.to_string());
238    }
239    if name.contains(|c: char| c.is_control()) {
240        return Err(crate::messages::SNIPPET_NAME_CONTROL_CHARS.to_string());
241    }
242    Ok(())
243}
244
245/// Validate a snippet command: non-empty, no control characters (except tab).
246pub fn validate_command(command: &str) -> Result<(), String> {
247    if command.trim().is_empty() {
248        return Err(crate::messages::SNIPPET_COMMAND_EMPTY.to_string());
249    }
250    if command.contains(|c: char| c.is_control() && c != '\t') {
251        return Err(crate::messages::SNIPPET_COMMAND_CONTROL_CHARS.to_string());
252    }
253    Ok(())
254}
255
256// =========================================================================
257// Parameter support
258// =========================================================================
259
260/// A parameter found in a snippet command template.
261#[derive(Debug, Clone, PartialEq)]
262pub struct SnippetParam {
263    pub name: String,
264    pub default: Option<String>,
265}
266
267/// Shell-escape a string with single quotes (POSIX).
268/// Internal single quotes are escaped as `'\''`.
269pub fn shell_escape(s: &str) -> String {
270    format!("'{}'", s.replace('\'', "'\\''"))
271}
272
273/// Parse `{{name}}` and `{{name:default}}` from a command string.
274/// Returns params in order of first appearance, deduplicated. Max 20 params.
275pub fn parse_params(command: &str) -> Vec<SnippetParam> {
276    let mut params = Vec::new();
277    let mut seen = std::collections::HashSet::new();
278    let bytes = command.as_bytes();
279    let len = bytes.len();
280    let mut i = 0;
281    while i + 3 < len {
282        if bytes[i] == b'{' && bytes.get(i + 1) == Some(&b'{') {
283            if let Some(end) = command[i + 2..].find("}}") {
284                let inner = &command[i + 2..i + 2 + end];
285                let (name, default) = if let Some((n, d)) = inner.split_once(':') {
286                    (n.to_string(), Some(d.to_string()))
287                } else {
288                    (inner.to_string(), None)
289                };
290                if validate_param_name(&name).is_ok() && !seen.contains(&name) && params.len() < 20
291                {
292                    seen.insert(name.clone());
293                    params.push(SnippetParam { name, default });
294                }
295                i = i + 2 + end + 2;
296                continue;
297            }
298        }
299        i += 1;
300    }
301    params
302}
303
304/// Count the distinct valid `{{name}}` parameters in a command without
305/// allocating the full parsed list. Equal to `parse_params(command).len()`;
306/// used by the list row, which only needs the count, on every frame.
307pub fn count_params(command: &str) -> usize {
308    let mut seen: Vec<&str> = Vec::new();
309    let bytes = command.as_bytes();
310    let len = bytes.len();
311    let mut i = 0;
312    while i + 3 < len {
313        if bytes[i] == b'{' && bytes.get(i + 1) == Some(&b'{') {
314            if let Some(end) = command[i + 2..].find("}}") {
315                let inner = &command[i + 2..i + 2 + end];
316                let name = inner.split_once(':').map_or(inner, |(n, _)| n);
317                if validate_param_name(name).is_ok() && !seen.contains(&name) && seen.len() < 20 {
318                    seen.push(name);
319                }
320                i = i + 2 + end + 2;
321                continue;
322            }
323        }
324        i += 1;
325    }
326    seen.len()
327}
328
329/// Validate a parameter name: non-empty, alphanumeric/underscore/hyphen only.
330/// Rejects `{`, `}`, `'`, whitespace and control chars.
331pub fn validate_param_name(name: &str) -> Result<(), String> {
332    if name.is_empty() {
333        return Err(crate::messages::SNIPPET_PARAM_NAME_EMPTY.to_string());
334    }
335    if !name
336        .chars()
337        .all(|c| c.is_alphanumeric() || c == '_' || c == '-')
338    {
339        return Err(crate::messages::snippet_param_name_invalid(name));
340    }
341    Ok(())
342}
343
344/// Substitute parameters into a command template (single-pass).
345/// All values (user-provided and defaults) are shell-escaped.
346pub fn substitute_params(
347    command: &str,
348    values: &std::collections::HashMap<String, String>,
349) -> String {
350    let mut result = String::with_capacity(command.len());
351    let bytes = command.as_bytes();
352    let len = bytes.len();
353    let mut i = 0;
354    while i < len {
355        if i + 3 < len && bytes[i] == b'{' && bytes[i + 1] == b'{' {
356            if let Some(end) = command[i + 2..].find("}}") {
357                let inner = &command[i + 2..i + 2 + end];
358                let (name, default) = if let Some((n, d)) = inner.split_once(':') {
359                    (n, Some(d))
360                } else {
361                    (inner, None)
362                };
363                // Only a valid name is a parameter. parse_params and count_params
364                // gate the same way, so an invalid placeholder (e.g. `{{a b}}`)
365                // stays literal here instead of being silently rewritten to ''.
366                if validate_param_name(name).is_ok() {
367                    let value = values
368                        .get(name)
369                        .filter(|v| !v.is_empty())
370                        .map(|v| v.as_str())
371                        .or(default)
372                        .unwrap_or("");
373                    result.push_str(&shell_escape(value));
374                    i = i + 2 + end + 2;
375                    continue;
376                }
377            }
378        }
379        // Properly decode UTF-8 character (not byte-level cast)
380        let ch = command[i..].chars().next().unwrap();
381        result.push(ch);
382        i += ch.len_utf8();
383    }
384    result
385}
386
387// =========================================================================
388// Output sanitization
389// =========================================================================
390
391/// Strip ANSI escape sequences and C1 control codes from output.
392/// Handles CSI, OSC, DCS, SOS, PM and APC sequences plus the C1 range 0x80-0x9F.
393pub fn sanitize_output(input: &str) -> String {
394    let mut out = String::with_capacity(input.len());
395    let mut chars = input.chars().peekable();
396    while let Some(c) = chars.next() {
397        match c {
398            '\x1b' => {
399                match chars.peek() {
400                    Some('[') => {
401                        chars.next();
402                        // CSI: consume until 0x40-0x7E
403                        while let Some(&ch) = chars.peek() {
404                            chars.next();
405                            if ('\x40'..='\x7e').contains(&ch) {
406                                break;
407                            }
408                        }
409                    }
410                    Some(']') | Some('P') | Some('X') | Some('^') | Some('_') => {
411                        chars.next();
412                        // OSC/DCS/SOS/PM/APC: consume until ST (ESC\) or BEL
413                        consume_until_st(&mut chars);
414                    }
415                    _ => {
416                        // Single ESC + one char
417                        chars.next();
418                    }
419                }
420            }
421            '\u{009b}' => {
422                // 8-bit CSI (the single-char form of ESC[): consume the
423                // parameter bytes up to the final byte 0x40-0x7E so a colour or
424                // cursor sequence does not leak its parameters into the TUI as
425                // literal text. The terminator is bounded, so this never eats
426                // arbitrary trailing output (unlike a consume-until-ST string
427                // sequence, which the other C1 codes are left to strip
428                // byte-by-byte instead).
429                while let Some(&ch) = chars.peek() {
430                    chars.next();
431                    if ('\x40'..='\x7e').contains(&ch) {
432                        break;
433                    }
434                }
435            }
436            c if ('\u{0080}'..='\u{009F}').contains(&c) => {
437                // Other C1 control codes: skip the lone byte.
438            }
439            c if c.is_control() && c != '\n' && c != '\t' => {
440                // Other control chars (except newline/tab): skip
441            }
442            _ => out.push(c),
443        }
444    }
445    out
446}
447
448/// Consume chars until String Terminator (ESC\) or BEL (\x07).
449fn consume_until_st(chars: &mut std::iter::Peekable<std::str::Chars<'_>>) {
450    while let Some(&ch) = chars.peek() {
451        if ch == '\x07' {
452            chars.next();
453            break;
454        }
455        if ch == '\x1b' {
456            chars.next();
457            if chars.peek() == Some(&'\\') {
458                chars.next();
459            }
460            break;
461        }
462        chars.next();
463    }
464}
465
466// =========================================================================
467// Background snippet execution
468// =========================================================================
469
470/// Maximum lines stored per host. Reader continues draining beyond this
471/// to prevent child from blocking on a full pipe buffer.
472const MAX_OUTPUT_LINES: usize = 10_000;
473
474/// RAII guard that kills the process group on drop.
475/// Uses SIGTERM first, then escalates to SIGKILL after a brief wait.
476pub struct ChildGuard {
477    inner: std::sync::Mutex<Option<std::process::Child>>,
478    pgid: i32,
479}
480
481impl ChildGuard {
482    fn new(child: std::process::Child) -> Self {
483        // i32::try_from avoids silent overflow for PIDs > i32::MAX. Fallback 0
484        // is a sentinel the Drop guard treats as "no process group": it skips
485        // the group signal entirely (a negative pgid built from 0 or 1 would
486        // target our own group or PID 1/init) and falls back to child.kill().
487        // In practice Linux caps PIDs well below i32::MAX.
488        let pgid = i32::try_from(child.id()).unwrap_or(0);
489        Self {
490            inner: std::sync::Mutex::new(Some(child)),
491            pgid,
492        }
493    }
494}
495
496impl Drop for ChildGuard {
497    fn drop(&mut self) {
498        let mut lock = self.inner.lock().unwrap_or_else(|e| e.into_inner());
499        if let Some(ref mut child) = *lock {
500            // Already exited? Skip kill entirely (PID may be recycled).
501            if let Ok(Some(_)) = child.try_wait() {
502                return;
503            }
504            // Group signal only for a valid pgid (> 1). pgid 0 is the overflow
505            // sentinel; a negative pgid built from 0 or 1 would hit our own
506            // group or PID 1 (init), so skip straight to the direct child.kill().
507            #[cfg(unix)]
508            if self.pgid > 1 {
509                // SAFETY: self.pgid was set by setpgid(0,0) in pre_exec and is
510                // valid for the lifetime of this SnippetChild. kill() with a
511                // negative PID sends the signal to the entire process group.
512                // ESRCH (process already exited) is the expected race; the
513                // return value is intentionally ignored.
514                unsafe {
515                    libc::kill(-self.pgid, libc::SIGTERM);
516                }
517                // Poll for up to 500ms
518                let deadline = std::time::Instant::now() + std::time::Duration::from_millis(500);
519                loop {
520                    if let Ok(Some(_)) = child.try_wait() {
521                        return;
522                    }
523                    if std::time::Instant::now() >= deadline {
524                        break;
525                    }
526                    std::thread::sleep(std::time::Duration::from_millis(50));
527                }
528                // SAFETY: same invariants as the SIGTERM call above.
529                unsafe {
530                    libc::kill(-self.pgid, libc::SIGKILL);
531                }
532            }
533            // Fallback: direct kill in case setpgid failed in pre_exec or the
534            // pgid was the overflow sentinel.
535            let _ = child.kill();
536            let _ = child.wait();
537        }
538    }
539}
540
541/// Read a pipe into a String under two hard bounds: a total byte cap
542/// (`SSH_OUTPUT_MAX_BYTES`) and a line cap (`MAX_OUTPUT_LINES`). The byte cap is
543/// applied first via [`read_bounded`], which reads fixed chunks and drains the
544/// remainder to a sink once the cap is hit, so a single newline-free megastream
545/// from a hostile remote can never buffer unbounded memory. The bounded bytes
546/// are then split into at most `MAX_OUTPUT_LINES` lines.
547fn read_pipe_capped<R: io::Read>(reader: R, alias: &str, stream: &str) -> String {
548    use io::BufRead;
549    let mut br = io::BufReader::new(reader);
550    let bounded = read_bounded(&mut br, SSH_OUTPUT_MAX_BYTES, alias, stream);
551
552    let mut output = String::new();
553    let mut line_count = 0;
554    let mut rdr = io::BufReader::new(bounded.as_slice());
555    let mut buf = Vec::new();
556    loop {
557        buf.clear();
558        match rdr.read_until(b'\n', &mut buf) {
559            Ok(0) => break, // EOF
560            Ok(_) => {
561                if line_count < MAX_OUTPUT_LINES {
562                    if line_count > 0 {
563                        output.push('\n');
564                    }
565                    // Strip trailing newline (and \r for CRLF)
566                    if buf.last() == Some(&b'\n') {
567                        buf.pop();
568                        if buf.last() == Some(&b'\r') {
569                            buf.pop();
570                        }
571                    }
572                    // Lossy conversion handles non-UTF-8 output
573                    output.push_str(&String::from_utf8_lossy(&buf));
574                    line_count += 1;
575                } else {
576                    output.push_str("\n[Output truncated at 10,000 lines]");
577                    break;
578                }
579            }
580            Err(_) => break,
581        }
582    }
583    output
584}
585
586/// Build the base SSH command with shared options for snippet execution.
587/// Sets -F, ConnectTimeout, ControlMaster/ControlPath and ClearAllForwardings.
588/// Also configures askpass and Bitwarden session env vars.
589///
590/// When `non_interactive` is true, adds `-o StrictHostKeyChecking=yes` so an
591/// unknown host returns an error instead of writing a prompt to the controlling
592/// tty. Background fetches (container listings, file browser listings, captured
593/// snippet output) pass `true`. Direct CLI use passes `false` so users retain
594/// normal host-key trust-on-first-use behaviour.
595fn base_ssh_command(
596    alias: &str,
597    config_path: &Path,
598    command: &str,
599    askpass: Option<&str>,
600    bw_session: Option<&str>,
601    has_active_tunnel: bool,
602    non_interactive: bool,
603) -> Command {
604    let mut cmd = Command::new("ssh");
605    cmd.arg("-F")
606        .arg(config_path)
607        .arg("-o")
608        .arg("ConnectTimeout=10")
609        .arg("-o")
610        .arg("ControlMaster=no")
611        .arg("-o")
612        .arg("ControlPath=none");
613
614    if non_interactive {
615        cmd.arg("-o").arg("StrictHostKeyChecking=yes");
616    }
617
618    if has_active_tunnel {
619        cmd.arg("-o").arg("ClearAllForwardings=yes");
620    }
621
622    cmd.arg("--").arg(alias).arg(command);
623
624    if askpass.is_some() {
625        crate::askpass_env::configure_ssh_command(&mut cmd, alias, config_path);
626    }
627
628    if let Some(token) = bw_session {
629        cmd.env("BW_SESSION", token);
630    }
631
632    cmd
633}
634
635/// Build the SSH Command for a snippet execution with piped I/O.
636fn build_snippet_command(
637    alias: &str,
638    config_path: &Path,
639    command: &str,
640    askpass: Option<&str>,
641    bw_session: Option<&str>,
642    has_active_tunnel: bool,
643) -> Command {
644    let mut cmd = base_ssh_command(
645        alias,
646        config_path,
647        command,
648        askpass,
649        bw_session,
650        has_active_tunnel,
651        true,
652    );
653    cmd.stdin(Stdio::null())
654        .stdout(Stdio::piped())
655        .stderr(Stdio::piped());
656
657    // Isolate child into its own process group so we can kill the
658    // entire tree without affecting purple itself.
659    #[cfg(unix)]
660    // SAFETY: the pre-fork callback runs between fork and the exec syscall in
661    // the child; only async-signal-safe calls are permitted. `setpgid(0, 0)`
662    // is async-signal-safe per POSIX and does not touch Rust runtime state.
663    unsafe {
664        use std::os::unix::process::CommandExt;
665        cmd.pre_exec(|| {
666            libc::setpgid(0, 0);
667            Ok(())
668        });
669    }
670
671    cmd
672}
673
674/// Execute a single host: spawn SSH, read output, wait, send result.
675fn execute_host(
676    run_id: u64,
677    ctx: &crate::ssh_context::SshContext<'_>,
678    command: &str,
679    tx: &std::sync::mpsc::Sender<crate::event::AppEvent>,
680) -> Option<std::sync::Arc<ChildGuard>> {
681    let alias = ctx.alias;
682    let mut cmd = build_snippet_command(
683        alias,
684        ctx.config_path,
685        command,
686        ctx.askpass,
687        ctx.bw_session,
688        ctx.has_tunnel,
689    );
690
691    match cmd.spawn() {
692        Ok(child) => {
693            let guard = std::sync::Arc::new(ChildGuard::new(child));
694
695            // Take stdout/stderr BEFORE wait to avoid pipe deadlock
696            let stdout_pipe = {
697                let mut lock = guard.inner.lock().unwrap_or_else(|e| e.into_inner());
698                lock.as_mut().and_then(|c| c.stdout.take())
699            };
700            let stderr_pipe = {
701                let mut lock = guard.inner.lock().unwrap_or_else(|e| e.into_inner());
702                lock.as_mut().and_then(|c| c.stderr.take())
703            };
704
705            // Spawn reader threads
706            let alias_out = alias.to_string();
707            let stdout_handle = std::thread::spawn(move || match stdout_pipe {
708                Some(pipe) => read_pipe_capped(pipe, &alias_out, "stdout"),
709                None => String::new(),
710            });
711            let alias_err = alias.to_string();
712            let stderr_handle = std::thread::spawn(move || match stderr_pipe {
713                Some(pipe) => read_pipe_capped(pipe, &alias_err, "stderr"),
714                None => String::new(),
715            });
716
717            // Join readers BEFORE wait to guarantee all output is received
718            let stdout_text = stdout_handle.join().unwrap_or_else(|_| {
719                log::warn!("[purple] Snippet stdout reader thread panicked");
720                String::new()
721            });
722            let stderr_text = stderr_handle.join().unwrap_or_else(|_| {
723                log::warn!("[purple] Snippet stderr reader thread panicked");
724                String::new()
725            });
726
727            // Now wait for the child to exit, then take it out of the
728            // guard so Drop won't kill a potentially recycled PID.
729            let exit_code = {
730                let mut lock = guard.inner.lock().unwrap_or_else(|e| e.into_inner());
731                let status = lock.as_mut().and_then(|c| c.wait().ok());
732                let _ = lock.take(); // Prevent ChildGuard::drop from killing recycled PID
733                status.and_then(|s| {
734                    #[cfg(unix)]
735                    {
736                        use std::os::unix::process::ExitStatusExt;
737                        s.code().or_else(|| s.signal().map(|sig| 128 + sig))
738                    }
739                    #[cfg(not(unix))]
740                    {
741                        s.code()
742                    }
743                })
744            };
745
746            let _ = tx.send(crate::event::AppEvent::SnippetHostDone {
747                run_id,
748                alias: alias.to_string(),
749                stdout: sanitize_output(&stdout_text),
750                stderr: sanitize_output(&stderr_text),
751                exit_code,
752            });
753
754            Some(guard)
755        }
756        Err(e) => {
757            log::warn!(
758                "[external] snippet ssh spawn failed: run_id={} alias={} err={}",
759                run_id,
760                alias,
761                e
762            );
763            let _ = tx.send(crate::event::AppEvent::SnippetHostDone {
764                run_id,
765                alias: alias.to_string(),
766                stdout: String::new(),
767                stderr: crate::messages::snippet_ssh_launch_failed(&e),
768                exit_code: None,
769            });
770            None
771        }
772    }
773}
774
775/// Spawn background snippet execution on multiple hosts.
776/// The coordinator thread drives sequential or parallel host iteration.
777#[allow(clippy::too_many_arguments)]
778pub fn spawn_snippet_execution(
779    run_id: u64,
780    askpass_map: Vec<(String, Option<String>)>,
781    config_path: PathBuf,
782    env: std::sync::Arc<crate::runtime::env::Env>,
783    command: String,
784    bw_session: Option<String>,
785    tunnel_aliases: std::collections::HashSet<String>,
786    cancel: std::sync::Arc<std::sync::atomic::AtomicBool>,
787    tx: std::sync::mpsc::Sender<crate::event::AppEvent>,
788    parallel: bool,
789) {
790    let total = askpass_map.len();
791    let max_concurrent: usize = 20;
792
793    std::thread::Builder::new()
794        .name("snippet-coordinator".into())
795        .spawn(move || {
796            let guards: std::sync::Arc<std::sync::Mutex<Vec<std::sync::Arc<ChildGuard>>>> =
797                std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
798
799            if parallel && total > 1 {
800                // Slot-based semaphore for concurrency limiting
801                let (slot_tx, slot_rx) = std::sync::mpsc::channel::<()>();
802                for _ in 0..max_concurrent.min(total) {
803                    let _ = slot_tx.send(());
804                }
805
806                let completed = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
807                let mut worker_handles = Vec::new();
808
809                for (alias, askpass) in askpass_map {
810                    if cancel.load(std::sync::atomic::Ordering::Relaxed) {
811                        break;
812                    }
813
814                    // Wait for a slot, checking cancel periodically
815                    loop {
816                        match slot_rx.recv_timeout(std::time::Duration::from_millis(100)) {
817                            Ok(()) => break,
818                            Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
819                                if cancel.load(std::sync::atomic::Ordering::Relaxed) {
820                                    break;
821                                }
822                            }
823                            Err(_) => break, // channel closed
824                        }
825                    }
826
827                    if cancel.load(std::sync::atomic::Ordering::Relaxed) {
828                        break;
829                    }
830
831                    let config_path = config_path.clone();
832                    let env = std::sync::Arc::clone(&env);
833                    let command = command.clone();
834                    let bw_session = bw_session.clone();
835                    let has_tunnel = tunnel_aliases.contains(&alias);
836                    let tx = tx.clone();
837                    let slot_tx = slot_tx.clone();
838                    let guards = guards.clone();
839                    let completed = completed.clone();
840                    let total = total;
841
842                    let handle = std::thread::spawn(move || {
843                        // RAII guard: release semaphore slot even on panic
844                        struct SlotRelease(Option<std::sync::mpsc::Sender<()>>);
845                        impl Drop for SlotRelease {
846                            fn drop(&mut self) {
847                                if let Some(tx) = self.0.take() {
848                                    let _ = tx.send(());
849                                }
850                            }
851                        }
852                        let _slot = SlotRelease(Some(slot_tx));
853
854                        let host_ctx = crate::ssh_context::SshContext {
855                            alias: &alias,
856                            config_path: &config_path,
857                            askpass: askpass.as_deref(),
858                            bw_session: bw_session.as_deref(),
859                            has_tunnel,
860                            env: &env,
861                        };
862                        let guard = execute_host(run_id, &host_ctx, &command, &tx);
863
864                        // Insert guard BEFORE checking cancel so it can be cleaned up
865                        if let Some(g) = guard {
866                            guards.lock().unwrap_or_else(|e| e.into_inner()).push(g);
867                        }
868
869                        let c = completed.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
870                        let _ = tx.send(crate::event::AppEvent::SnippetProgress {
871                            run_id,
872                            completed: c,
873                            total,
874                        });
875                        // _slot dropped here, releasing semaphore
876                    });
877                    worker_handles.push(handle);
878                }
879
880                // Wait for all workers to finish
881                for handle in worker_handles {
882                    let _ = handle.join();
883                }
884            } else {
885                // Sequential execution
886                for (i, (alias, askpass)) in askpass_map.into_iter().enumerate() {
887                    if cancel.load(std::sync::atomic::Ordering::Relaxed) {
888                        break;
889                    }
890
891                    let has_tunnel = tunnel_aliases.contains(&alias);
892                    let host_ctx = crate::ssh_context::SshContext {
893                        alias: &alias,
894                        config_path: &config_path,
895                        askpass: askpass.as_deref(),
896                        bw_session: bw_session.as_deref(),
897                        has_tunnel,
898                        env: &env,
899                    };
900                    let guard = execute_host(run_id, &host_ctx, &command, &tx);
901
902                    if let Some(g) = guard {
903                        guards.lock().unwrap_or_else(|e| e.into_inner()).push(g);
904                    }
905
906                    let _ = tx.send(crate::event::AppEvent::SnippetProgress {
907                        run_id,
908                        completed: i + 1,
909                        total,
910                    });
911                }
912            }
913
914            let _ = tx.send(crate::event::AppEvent::SnippetAllDone { run_id });
915            // Guards dropped here, cleaning up any remaining children
916        })
917        .expect("failed to spawn snippet coordinator");
918}
919
920/// Run a snippet on a single host via SSH.
921/// When `capture` is true, stdout/stderr are piped and returned in the result.
922/// When `capture` is false, stdout/stderr are inherited (streamed to terminal
923/// in real-time) and the returned strings are empty.
924#[allow(clippy::too_many_arguments)]
925pub fn run_snippet(
926    alias: &str,
927    config_path: &Path,
928    env: &crate::runtime::env::Env,
929    command: &str,
930    askpass: Option<&str>,
931    bw_session: Option<&str>,
932    capture: bool,
933    has_active_tunnel: bool,
934) -> anyhow::Result<SnippetResult> {
935    // Renew the Vault SSH cert before connecting so container listing,
936    // inspect, logs, actions and file-browser operations get a fresh cert
937    // just like the interactive connect path does. No-op for non-vault hosts.
938    crate::runtime::helpers::ensure_vault_cert_for_alias(env, alias, config_path);
939
940    let mut cmd = base_ssh_command(
941        alias,
942        config_path,
943        command,
944        askpass,
945        bw_session,
946        has_active_tunnel,
947        capture,
948    );
949
950    if capture {
951        cmd.stdin(Stdio::null())
952            .stdout(Stdio::piped())
953            .stderr(Stdio::piped());
954    } else {
955        cmd.stdin(Stdio::inherit())
956            .stdout(Stdio::inherit())
957            .stderr(Stdio::inherit());
958    }
959
960    if capture {
961        let (status, stdout, stderr) = run_with_bounded_output(&mut cmd, alias)?;
962        Ok(SnippetResult {
963            status,
964            stdout,
965            stderr,
966        })
967    } else {
968        let status = cmd
969            .status()
970            .map_err(|e| anyhow::anyhow!("Failed to run ssh for '{}': {}", alias, e))?;
971
972        Ok(SnippetResult {
973            status,
974            stdout: String::new(),
975            stderr: String::new(),
976        })
977    }
978}
979
980/// Hard cap on stdout/stderr captured from an SSH child process.
981/// A hostile or malfunctioning remote can stream unbounded output and
982/// blow up purple's memory. 16 MB is far above any legitimate
983/// `docker inspect` / `docker logs --tail 200` / `ps -a` output (which
984/// peak at hundreds of KB) and below what would meaningfully stress
985/// the parse pipelines or terminal buffer.
986pub const SSH_OUTPUT_MAX_BYTES: usize = 16 * 1024 * 1024;
987
988/// Spawn a child with piped stdout/stderr and read each pipe with a
989/// hard byte cap. Once the cap is hit the remaining bytes are drained
990/// to a sink (so the child can exit cleanly) and a debug log records
991/// the truncation. Returns the captured payload as lossy-UTF8 Strings
992/// alongside the child's exit status.
993fn run_with_bounded_output(
994    cmd: &mut Command,
995    alias: &str,
996) -> anyhow::Result<(ExitStatus, String, String)> {
997    let mut child = cmd
998        .spawn()
999        .map_err(|e| anyhow::anyhow!("Failed to spawn ssh for '{}': {}", alias, e))?;
1000    let stdout = child.stdout.take();
1001    let stderr = child.stderr.take();
1002
1003    let alias_for_stdout = alias.to_string();
1004    let stdout_handle = std::thread::spawn(move || match stdout {
1005        Some(mut pipe) => {
1006            read_bounded(&mut pipe, SSH_OUTPUT_MAX_BYTES, &alias_for_stdout, "stdout")
1007        }
1008        None => Vec::new(),
1009    });
1010    let alias_for_stderr = alias.to_string();
1011    let stderr_handle = std::thread::spawn(move || match stderr {
1012        Some(mut pipe) => {
1013            read_bounded(&mut pipe, SSH_OUTPUT_MAX_BYTES, &alias_for_stderr, "stderr")
1014        }
1015        None => Vec::new(),
1016    });
1017
1018    // Join the drain threads BEFORE waiting on the child. The threads
1019    // own the pipe read-ends; when they finish (cap hit or EOF) the
1020    // reader handle drops and closes its side of the pipe, which
1021    // unblocks any pending write on the remote child. Waiting on the
1022    // child first would deadlock the moment stdout exceeded the kernel
1023    // pipe buffer (typically 64 KB on Linux): the child blocks on
1024    // write, the parent blocks on wait, neither thread runs.
1025    let stdout_bytes = stdout_handle.join().unwrap_or_default();
1026    let stderr_bytes = stderr_handle.join().unwrap_or_default();
1027    let status = child
1028        .wait()
1029        .map_err(|e| anyhow::anyhow!("ssh wait failed for '{}': {}", alias, e))?;
1030
1031    Ok((
1032        status,
1033        String::from_utf8_lossy(&stdout_bytes).to_string(),
1034        String::from_utf8_lossy(&stderr_bytes).to_string(),
1035    ))
1036}
1037
1038fn read_bounded<R: std::io::Read>(
1039    reader: &mut R,
1040    max: usize,
1041    alias: &str,
1042    stream: &str,
1043) -> Vec<u8> {
1044    let mut out = Vec::with_capacity(8 * 1024);
1045    let mut chunk = [0u8; 8 * 1024];
1046    loop {
1047        match reader.read(&mut chunk) {
1048            Ok(0) => break,
1049            Ok(n) => {
1050                if out.len() + n > max {
1051                    let allowed = max.saturating_sub(out.len());
1052                    out.extend_from_slice(&chunk[..allowed]);
1053                    log::warn!(
1054                        "[external] ssh {} for '{}' exceeded {} bytes; truncating remainder",
1055                        stream,
1056                        alias,
1057                        max
1058                    );
1059                    // Drain remaining bytes so the child can exit cleanly
1060                    // instead of blocking on a backpressured pipe.
1061                    let _ = std::io::copy(reader, &mut std::io::sink());
1062                    break;
1063                }
1064                out.extend_from_slice(&chunk[..n]);
1065            }
1066            Err(e) => {
1067                log::debug!("[external] ssh {stream} read error for '{alias}': {e}");
1068                break;
1069            }
1070        }
1071    }
1072    out
1073}
1074
1075/// Static, execution-free blast-radius analysis of a snippet command for the
1076/// Snippets detail IMPACT card. Implemented in [`crate::snippet_impact`]
1077/// (quote-aware segmenter + curated capability table); re-exported here so the
1078/// `crate::snippet::analyze_command` path stays stable.
1079pub use crate::snippet_impact::{Category, CommandImpact, Finding, Severity, analyze_command};
1080
1081/// Indices of snippets in `store` matching `query` by name, command or
1082/// description (case-insensitive). A `None` or empty query returns every index
1083/// in order. Shared by the host-list snippet picker and the Snippets tab.
1084pub fn filtered_indices(store: &SnippetStore, query: Option<&str>) -> Vec<usize> {
1085    match query {
1086        None | Some("") => (0..store.snippets.len()).collect(),
1087        Some(q) => store
1088            .snippets
1089            .iter()
1090            .enumerate()
1091            .filter(|(_, s)| {
1092                crate::app::contains_ci(&s.name, q)
1093                    || crate::app::contains_ci(&s.command, q)
1094                    || crate::app::contains_ci(&s.description, q)
1095            })
1096            .map(|(i, _)| i)
1097            .collect(),
1098    }
1099}
1100
1101#[cfg(test)]
1102#[path = "snippet_tests.rs"]
1103mod tests;