Skip to main content

purple_ssh/
snippet.rs

1use std::io;
2use std::path::{Path, PathBuf};
3use std::process::{Command, ExitStatus, Stdio};
4
5use crate::fs_util;
6
7/// A saved command snippet.
8#[derive(Debug, Clone, PartialEq)]
9pub struct Snippet {
10    pub name: String,
11    pub command: String,
12    pub description: String,
13}
14
15/// Result of running a snippet on a host.
16pub struct SnippetResult {
17    pub status: ExitStatus,
18    pub stdout: String,
19    pub stderr: String,
20}
21
22/// Snippet storage backed by ~/.purple/snippets (INI-style).
23#[derive(Debug, Clone, Default)]
24pub struct SnippetStore {
25    pub snippets: Vec<Snippet>,
26    /// Override path for save(). None uses the default ~/.purple/snippets.
27    pub path_override: Option<PathBuf>,
28}
29
30fn config_path() -> Option<PathBuf> {
31    dirs::home_dir().map(|h| h.join(".purple/snippets"))
32}
33
34impl SnippetStore {
35    /// Load snippets from ~/.purple/snippets.
36    /// Returns empty store if file doesn't exist (normal first-use).
37    pub fn load() -> Self {
38        let path = match config_path() {
39            Some(p) => p,
40            None => return Self::default(),
41        };
42        let content = match std::fs::read_to_string(&path) {
43            Ok(c) => c,
44            Err(e) if e.kind() == io::ErrorKind::NotFound => return Self::default(),
45            Err(e) => {
46                log::warn!("[config] Could not read {}: {}", path.display(), e);
47                return Self::default();
48            }
49        };
50        Self::parse(&content)
51    }
52
53    /// Parse INI-style snippet config.
54    pub fn parse(content: &str) -> Self {
55        let mut snippets = Vec::new();
56        let mut current: Option<Snippet> = None;
57
58        for line in content.lines() {
59            let trimmed = line.trim();
60            if trimmed.is_empty() || trimmed.starts_with('#') {
61                continue;
62            }
63            if trimmed.starts_with('[') && trimmed.ends_with(']') {
64                if let Some(snippet) = current.take() {
65                    if !snippet.command.is_empty()
66                        && !snippets.iter().any(|s: &Snippet| s.name == snippet.name)
67                    {
68                        snippets.push(snippet);
69                    }
70                }
71                let name = trimmed[1..trimmed.len() - 1].trim().to_string();
72                if snippets.iter().any(|s| s.name == name) {
73                    current = None;
74                    continue;
75                }
76                current = Some(Snippet {
77                    name,
78                    command: String::new(),
79                    description: String::new(),
80                });
81            } else if let Some(ref mut snippet) = current {
82                if let Some((key, value)) = trimmed.split_once('=') {
83                    let key = key.trim();
84                    // Trim whitespace around key but preserve value content
85                    // (only trim leading whitespace after '=', not trailing)
86                    let value = value.trim_start().to_string();
87                    match key {
88                        "command" => snippet.command = value,
89                        "description" => snippet.description = value,
90                        _ => {}
91                    }
92                }
93            }
94        }
95        if let Some(snippet) = current {
96            if !snippet.command.is_empty() && !snippets.iter().any(|s| s.name == snippet.name) {
97                snippets.push(snippet);
98            }
99        }
100        Self {
101            snippets,
102            path_override: None,
103        }
104    }
105
106    /// Save snippets to ~/.purple/snippets (atomic write, chmod 600).
107    pub fn save(&self) -> io::Result<()> {
108        if crate::demo_flag::is_demo() {
109            return Ok(());
110        }
111        let path = match &self.path_override {
112            Some(p) => p.clone(),
113            None => match config_path() {
114                Some(p) => p,
115                None => {
116                    return Err(io::Error::new(
117                        io::ErrorKind::NotFound,
118                        "Could not determine home directory",
119                    ));
120                }
121            },
122        };
123
124        let mut content = String::new();
125        for (i, snippet) in self.snippets.iter().enumerate() {
126            if i > 0 {
127                content.push('\n');
128            }
129            content.push_str(&format!("[{}]\n", snippet.name));
130            content.push_str(&format!("command={}\n", snippet.command));
131            if !snippet.description.is_empty() {
132                content.push_str(&format!("description={}\n", snippet.description));
133            }
134        }
135
136        fs_util::atomic_write(&path, content.as_bytes())
137    }
138
139    /// Get a snippet by name.
140    pub fn get(&self, name: &str) -> Option<&Snippet> {
141        self.snippets.iter().find(|s| s.name == name)
142    }
143
144    /// Add or replace a snippet.
145    pub fn set(&mut self, snippet: Snippet) {
146        if let Some(existing) = self.snippets.iter_mut().find(|s| s.name == snippet.name) {
147            *existing = snippet;
148        } else {
149            self.snippets.push(snippet);
150        }
151    }
152
153    /// Remove a snippet by name.
154    pub fn remove(&mut self, name: &str) {
155        self.snippets.retain(|s| s.name != name);
156    }
157}
158
159/// Validate a snippet name: non-empty, no leading/trailing whitespace,
160/// no `#`, no `[`, no `]`, no control characters.
161pub fn validate_name(name: &str) -> Result<(), String> {
162    if name.trim().is_empty() {
163        return Err("Snippet name cannot be empty.".to_string());
164    }
165    if name != name.trim() {
166        return Err("Snippet name cannot have leading or trailing whitespace.".to_string());
167    }
168    if name.contains('#') || name.contains('[') || name.contains(']') {
169        return Err("Snippet name cannot contain #, [ or ].".to_string());
170    }
171    if name.contains(|c: char| c.is_control()) {
172        return Err("Snippet name cannot contain control characters.".to_string());
173    }
174    Ok(())
175}
176
177/// Validate a snippet command: non-empty, no control characters (except tab).
178pub fn validate_command(command: &str) -> Result<(), String> {
179    if command.trim().is_empty() {
180        return Err("Command cannot be empty.".to_string());
181    }
182    if command.contains(|c: char| c.is_control() && c != '\t') {
183        return Err("Command cannot contain control characters.".to_string());
184    }
185    Ok(())
186}
187
188// =========================================================================
189// Parameter support
190// =========================================================================
191
192/// A parameter found in a snippet command template.
193#[derive(Debug, Clone, PartialEq)]
194pub struct SnippetParam {
195    pub name: String,
196    pub default: Option<String>,
197}
198
199/// Shell-escape a string with single quotes (POSIX).
200/// Internal single quotes are escaped as `'\''`.
201pub fn shell_escape(s: &str) -> String {
202    format!("'{}'", s.replace('\'', "'\\''"))
203}
204
205/// Parse `{{name}}` and `{{name:default}}` from a command string.
206/// Returns params in order of first appearance, deduplicated. Max 20 params.
207pub fn parse_params(command: &str) -> Vec<SnippetParam> {
208    let mut params = Vec::new();
209    let mut seen = std::collections::HashSet::new();
210    let bytes = command.as_bytes();
211    let len = bytes.len();
212    let mut i = 0;
213    while i + 3 < len {
214        if bytes[i] == b'{' && bytes.get(i + 1) == Some(&b'{') {
215            if let Some(end) = command[i + 2..].find("}}") {
216                let inner = &command[i + 2..i + 2 + end];
217                let (name, default) = if let Some((n, d)) = inner.split_once(':') {
218                    (n.to_string(), Some(d.to_string()))
219                } else {
220                    (inner.to_string(), None)
221                };
222                if validate_param_name(&name).is_ok() && !seen.contains(&name) && params.len() < 20
223                {
224                    seen.insert(name.clone());
225                    params.push(SnippetParam { name, default });
226                }
227                i = i + 2 + end + 2;
228                continue;
229            }
230        }
231        i += 1;
232    }
233    params
234}
235
236/// Validate a parameter name: non-empty, alphanumeric/underscore/hyphen only.
237/// Rejects `{`, `}`, `'`, whitespace and control chars.
238pub fn validate_param_name(name: &str) -> Result<(), String> {
239    if name.is_empty() {
240        return Err("Parameter name cannot be empty.".to_string());
241    }
242    if !name
243        .chars()
244        .all(|c| c.is_alphanumeric() || c == '_' || c == '-')
245    {
246        return Err(format!(
247            "Parameter name '{}' contains invalid characters.",
248            name
249        ));
250    }
251    Ok(())
252}
253
254/// Substitute parameters into a command template (single-pass).
255/// All values (user-provided and defaults) are shell-escaped.
256pub fn substitute_params(
257    command: &str,
258    values: &std::collections::HashMap<String, String>,
259) -> String {
260    let mut result = String::with_capacity(command.len());
261    let bytes = command.as_bytes();
262    let len = bytes.len();
263    let mut i = 0;
264    while i < len {
265        if i + 3 < len && bytes[i] == b'{' && bytes[i + 1] == b'{' {
266            if let Some(end) = command[i + 2..].find("}}") {
267                let inner = &command[i + 2..i + 2 + end];
268                let (name, default) = if let Some((n, d)) = inner.split_once(':') {
269                    (n, Some(d))
270                } else {
271                    (inner, None)
272                };
273                let value = values
274                    .get(name)
275                    .filter(|v| !v.is_empty())
276                    .map(|v| v.as_str())
277                    .or(default)
278                    .unwrap_or("");
279                result.push_str(&shell_escape(value));
280                i = i + 2 + end + 2;
281                continue;
282            }
283        }
284        // Properly decode UTF-8 character (not byte-level cast)
285        let ch = command[i..].chars().next().unwrap();
286        result.push(ch);
287        i += ch.len_utf8();
288    }
289    result
290}
291
292// =========================================================================
293// Output sanitization
294// =========================================================================
295
296/// Strip ANSI escape sequences and C1 control codes from output.
297/// Handles CSI, OSC, DCS, SOS, PM and APC sequences plus the C1 range 0x80-0x9F.
298pub fn sanitize_output(input: &str) -> String {
299    let mut out = String::with_capacity(input.len());
300    let mut chars = input.chars().peekable();
301    while let Some(c) = chars.next() {
302        match c {
303            '\x1b' => {
304                match chars.peek() {
305                    Some('[') => {
306                        chars.next();
307                        // CSI: consume until 0x40-0x7E
308                        while let Some(&ch) = chars.peek() {
309                            chars.next();
310                            if ('\x40'..='\x7e').contains(&ch) {
311                                break;
312                            }
313                        }
314                    }
315                    Some(']') | Some('P') | Some('X') | Some('^') | Some('_') => {
316                        chars.next();
317                        // OSC/DCS/SOS/PM/APC: consume until ST (ESC\) or BEL
318                        consume_until_st(&mut chars);
319                    }
320                    _ => {
321                        // Single ESC + one char
322                        chars.next();
323                    }
324                }
325            }
326            c if ('\u{0080}'..='\u{009F}').contains(&c) => {
327                // C1 control codes: skip
328            }
329            c if c.is_control() && c != '\n' && c != '\t' => {
330                // Other control chars (except newline/tab): skip
331            }
332            _ => out.push(c),
333        }
334    }
335    out
336}
337
338/// Consume chars until String Terminator (ESC\) or BEL (\x07).
339fn consume_until_st(chars: &mut std::iter::Peekable<std::str::Chars<'_>>) {
340    while let Some(&ch) = chars.peek() {
341        if ch == '\x07' {
342            chars.next();
343            break;
344        }
345        if ch == '\x1b' {
346            chars.next();
347            if chars.peek() == Some(&'\\') {
348                chars.next();
349            }
350            break;
351        }
352        chars.next();
353    }
354}
355
356// =========================================================================
357// Background snippet execution
358// =========================================================================
359
360/// Maximum lines stored per host. Reader continues draining beyond this
361/// to prevent child from blocking on a full pipe buffer.
362const MAX_OUTPUT_LINES: usize = 10_000;
363
364/// RAII guard that kills the process group on drop.
365/// Uses SIGTERM first, then escalates to SIGKILL after a brief wait.
366pub struct ChildGuard {
367    inner: std::sync::Mutex<Option<std::process::Child>>,
368    pgid: i32,
369}
370
371impl ChildGuard {
372    fn new(child: std::process::Child) -> Self {
373        // i32::try_from avoids silent overflow for PIDs > i32::MAX.
374        // Fallback -1 makes killpg a harmless no-op on overflow.
375        // In practice Linux caps PIDs well below i32::MAX.
376        let pgid = i32::try_from(child.id()).unwrap_or(-1);
377        Self {
378            inner: std::sync::Mutex::new(Some(child)),
379            pgid,
380        }
381    }
382}
383
384impl Drop for ChildGuard {
385    fn drop(&mut self) {
386        let mut lock = self.inner.lock().unwrap_or_else(|e| e.into_inner());
387        if let Some(ref mut child) = *lock {
388            // Already exited? Skip kill entirely (PID may be recycled).
389            if let Ok(Some(_)) = child.try_wait() {
390                return;
391            }
392            // SAFETY: self.pgid was set by setpgid(0,0) in pre_exec and is
393            // valid for the lifetime of this SnippetChild. kill() with a
394            // negative PID sends the signal to the entire process group.
395            // ESRCH (process already exited) is the expected race; the
396            // return value is intentionally ignored.
397            #[cfg(unix)]
398            unsafe {
399                libc::kill(-self.pgid, libc::SIGTERM);
400            }
401            // Poll for up to 500ms
402            let deadline = std::time::Instant::now() + std::time::Duration::from_millis(500);
403            loop {
404                if let Ok(Some(_)) = child.try_wait() {
405                    return;
406                }
407                if std::time::Instant::now() >= deadline {
408                    break;
409                }
410                std::thread::sleep(std::time::Duration::from_millis(50));
411            }
412            // SAFETY: same invariants as the SIGTERM call above.
413            #[cfg(unix)]
414            unsafe {
415                libc::kill(-self.pgid, libc::SIGKILL);
416            }
417            // Fallback: direct kill in case setpgid failed in pre_exec
418            let _ = child.kill();
419            let _ = child.wait();
420        }
421    }
422}
423
424/// Read lines from a pipe. Stores up to `MAX_OUTPUT_LINES` but continues
425/// draining the pipe after that to prevent the child from blocking.
426fn read_pipe_capped<R: io::Read>(reader: R) -> String {
427    use io::BufRead;
428    let mut reader = io::BufReader::new(reader);
429    let mut output = String::new();
430    let mut line_count = 0;
431    let mut capped = false;
432    let mut buf = Vec::new();
433    loop {
434        buf.clear();
435        match reader.read_until(b'\n', &mut buf) {
436            Ok(0) => break, // EOF
437            Ok(_) => {
438                if !capped {
439                    if line_count < MAX_OUTPUT_LINES {
440                        if line_count > 0 {
441                            output.push('\n');
442                        }
443                        // Strip trailing newline (and \r for CRLF)
444                        if buf.last() == Some(&b'\n') {
445                            buf.pop();
446                            if buf.last() == Some(&b'\r') {
447                                buf.pop();
448                            }
449                        }
450                        // Lossy conversion handles non-UTF-8 output
451                        output.push_str(&String::from_utf8_lossy(&buf));
452                        line_count += 1;
453                    } else {
454                        output.push_str("\n[Output truncated at 10,000 lines]");
455                        capped = true;
456                    }
457                }
458                // If capped, keep reading but discard to drain the pipe
459            }
460            Err(_) => break,
461        }
462    }
463    output
464}
465
466/// Build the base SSH command with shared options for snippet execution.
467/// Sets -F, ConnectTimeout, ControlMaster/ControlPath and ClearAllForwardings.
468/// Also configures askpass and Bitwarden session env vars.
469///
470/// When `non_interactive` is true, adds `-o StrictHostKeyChecking=yes` so an
471/// unknown host returns an error instead of writing a prompt to the controlling
472/// tty. Background fetches (container listings, file browser listings, captured
473/// snippet output) pass `true`. Direct CLI use passes `false` so users retain
474/// normal host-key trust-on-first-use behaviour.
475fn base_ssh_command(
476    alias: &str,
477    config_path: &Path,
478    command: &str,
479    askpass: Option<&str>,
480    bw_session: Option<&str>,
481    has_active_tunnel: bool,
482    non_interactive: bool,
483) -> Command {
484    let mut cmd = Command::new("ssh");
485    cmd.arg("-F")
486        .arg(config_path)
487        .arg("-o")
488        .arg("ConnectTimeout=10")
489        .arg("-o")
490        .arg("ControlMaster=no")
491        .arg("-o")
492        .arg("ControlPath=none");
493
494    if non_interactive {
495        cmd.arg("-o").arg("StrictHostKeyChecking=yes");
496    }
497
498    if has_active_tunnel {
499        cmd.arg("-o").arg("ClearAllForwardings=yes");
500    }
501
502    cmd.arg("--").arg(alias).arg(command);
503
504    if askpass.is_some() {
505        crate::askpass_env::configure_ssh_command(&mut cmd, alias, config_path);
506    }
507
508    if let Some(token) = bw_session {
509        cmd.env("BW_SESSION", token);
510    }
511
512    cmd
513}
514
515/// Build the SSH Command for a snippet execution with piped I/O.
516fn build_snippet_command(
517    alias: &str,
518    config_path: &Path,
519    command: &str,
520    askpass: Option<&str>,
521    bw_session: Option<&str>,
522    has_active_tunnel: bool,
523) -> Command {
524    let mut cmd = base_ssh_command(
525        alias,
526        config_path,
527        command,
528        askpass,
529        bw_session,
530        has_active_tunnel,
531        true,
532    );
533    cmd.stdin(Stdio::null())
534        .stdout(Stdio::piped())
535        .stderr(Stdio::piped());
536
537    // Isolate child into its own process group so we can kill the
538    // entire tree without affecting purple itself.
539    #[cfg(unix)]
540    // SAFETY: the pre-fork callback runs between fork and the exec syscall in
541    // the child; only async-signal-safe calls are permitted. `setpgid(0, 0)`
542    // is async-signal-safe per POSIX and does not touch Rust runtime state.
543    unsafe {
544        use std::os::unix::process::CommandExt;
545        cmd.pre_exec(|| {
546            libc::setpgid(0, 0);
547            Ok(())
548        });
549    }
550
551    cmd
552}
553
554/// Execute a single host: spawn SSH, read output, wait, send result.
555fn execute_host(
556    run_id: u64,
557    ctx: &crate::ssh_context::SshContext<'_>,
558    command: &str,
559    tx: &std::sync::mpsc::Sender<crate::event::AppEvent>,
560) -> Option<std::sync::Arc<ChildGuard>> {
561    let alias = ctx.alias;
562    let mut cmd = build_snippet_command(
563        alias,
564        ctx.config_path,
565        command,
566        ctx.askpass,
567        ctx.bw_session,
568        ctx.has_tunnel,
569    );
570
571    match cmd.spawn() {
572        Ok(child) => {
573            let guard = std::sync::Arc::new(ChildGuard::new(child));
574
575            // Take stdout/stderr BEFORE wait to avoid pipe deadlock
576            let stdout_pipe = {
577                let mut lock = guard.inner.lock().unwrap_or_else(|e| e.into_inner());
578                lock.as_mut().and_then(|c| c.stdout.take())
579            };
580            let stderr_pipe = {
581                let mut lock = guard.inner.lock().unwrap_or_else(|e| e.into_inner());
582                lock.as_mut().and_then(|c| c.stderr.take())
583            };
584
585            // Spawn reader threads
586            let stdout_handle = std::thread::spawn(move || match stdout_pipe {
587                Some(pipe) => read_pipe_capped(pipe),
588                None => String::new(),
589            });
590            let stderr_handle = std::thread::spawn(move || match stderr_pipe {
591                Some(pipe) => read_pipe_capped(pipe),
592                None => String::new(),
593            });
594
595            // Join readers BEFORE wait to guarantee all output is received
596            let stdout_text = stdout_handle.join().unwrap_or_else(|_| {
597                log::warn!("[purple] Snippet stdout reader thread panicked");
598                String::new()
599            });
600            let stderr_text = stderr_handle.join().unwrap_or_else(|_| {
601                log::warn!("[purple] Snippet stderr reader thread panicked");
602                String::new()
603            });
604
605            // Now wait for the child to exit, then take it out of the
606            // guard so Drop won't kill a potentially recycled PID.
607            let exit_code = {
608                let mut lock = guard.inner.lock().unwrap_or_else(|e| e.into_inner());
609                let status = lock.as_mut().and_then(|c| c.wait().ok());
610                let _ = lock.take(); // Prevent ChildGuard::drop from killing recycled PID
611                status.and_then(|s| {
612                    #[cfg(unix)]
613                    {
614                        use std::os::unix::process::ExitStatusExt;
615                        s.code().or_else(|| s.signal().map(|sig| 128 + sig))
616                    }
617                    #[cfg(not(unix))]
618                    {
619                        s.code()
620                    }
621                })
622            };
623
624            let _ = tx.send(crate::event::AppEvent::SnippetHostDone {
625                run_id,
626                alias: alias.to_string(),
627                stdout: sanitize_output(&stdout_text),
628                stderr: sanitize_output(&stderr_text),
629                exit_code,
630            });
631
632            Some(guard)
633        }
634        Err(e) => {
635            let _ = tx.send(crate::event::AppEvent::SnippetHostDone {
636                run_id,
637                alias: alias.to_string(),
638                stdout: String::new(),
639                stderr: format!("Failed to launch ssh: {}", e),
640                exit_code: None,
641            });
642            None
643        }
644    }
645}
646
647/// Spawn background snippet execution on multiple hosts.
648/// The coordinator thread drives sequential or parallel host iteration.
649#[allow(clippy::too_many_arguments)]
650pub fn spawn_snippet_execution(
651    run_id: u64,
652    askpass_map: Vec<(String, Option<String>)>,
653    config_path: PathBuf,
654    command: String,
655    bw_session: Option<String>,
656    tunnel_aliases: std::collections::HashSet<String>,
657    cancel: std::sync::Arc<std::sync::atomic::AtomicBool>,
658    tx: std::sync::mpsc::Sender<crate::event::AppEvent>,
659    parallel: bool,
660) {
661    let total = askpass_map.len();
662    let max_concurrent: usize = 20;
663
664    std::thread::Builder::new()
665        .name("snippet-coordinator".into())
666        .spawn(move || {
667            let guards: std::sync::Arc<std::sync::Mutex<Vec<std::sync::Arc<ChildGuard>>>> =
668                std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
669
670            if parallel && total > 1 {
671                // Slot-based semaphore for concurrency limiting
672                let (slot_tx, slot_rx) = std::sync::mpsc::channel::<()>();
673                for _ in 0..max_concurrent.min(total) {
674                    let _ = slot_tx.send(());
675                }
676
677                let completed = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
678                let mut worker_handles = Vec::new();
679
680                for (alias, askpass) in askpass_map {
681                    if cancel.load(std::sync::atomic::Ordering::Relaxed) {
682                        break;
683                    }
684
685                    // Wait for a slot, checking cancel periodically
686                    loop {
687                        match slot_rx.recv_timeout(std::time::Duration::from_millis(100)) {
688                            Ok(()) => break,
689                            Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
690                                if cancel.load(std::sync::atomic::Ordering::Relaxed) {
691                                    break;
692                                }
693                            }
694                            Err(_) => break, // channel closed
695                        }
696                    }
697
698                    if cancel.load(std::sync::atomic::Ordering::Relaxed) {
699                        break;
700                    }
701
702                    let config_path = config_path.clone();
703                    let command = command.clone();
704                    let bw_session = bw_session.clone();
705                    let has_tunnel = tunnel_aliases.contains(&alias);
706                    let tx = tx.clone();
707                    let slot_tx = slot_tx.clone();
708                    let guards = guards.clone();
709                    let completed = completed.clone();
710                    let total = total;
711
712                    let handle = std::thread::spawn(move || {
713                        // RAII guard: release semaphore slot even on panic
714                        struct SlotRelease(Option<std::sync::mpsc::Sender<()>>);
715                        impl Drop for SlotRelease {
716                            fn drop(&mut self) {
717                                if let Some(tx) = self.0.take() {
718                                    let _ = tx.send(());
719                                }
720                            }
721                        }
722                        let _slot = SlotRelease(Some(slot_tx));
723
724                        let host_ctx = crate::ssh_context::SshContext {
725                            alias: &alias,
726                            config_path: &config_path,
727                            askpass: askpass.as_deref(),
728                            bw_session: bw_session.as_deref(),
729                            has_tunnel,
730                        };
731                        let guard = execute_host(run_id, &host_ctx, &command, &tx);
732
733                        // Insert guard BEFORE checking cancel so it can be cleaned up
734                        if let Some(g) = guard {
735                            guards.lock().unwrap_or_else(|e| e.into_inner()).push(g);
736                        }
737
738                        let c = completed.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
739                        let _ = tx.send(crate::event::AppEvent::SnippetProgress {
740                            run_id,
741                            completed: c,
742                            total,
743                        });
744                        // _slot dropped here, releasing semaphore
745                    });
746                    worker_handles.push(handle);
747                }
748
749                // Wait for all workers to finish
750                for handle in worker_handles {
751                    let _ = handle.join();
752                }
753            } else {
754                // Sequential execution
755                for (i, (alias, askpass)) in askpass_map.into_iter().enumerate() {
756                    if cancel.load(std::sync::atomic::Ordering::Relaxed) {
757                        break;
758                    }
759
760                    let has_tunnel = tunnel_aliases.contains(&alias);
761                    let host_ctx = crate::ssh_context::SshContext {
762                        alias: &alias,
763                        config_path: &config_path,
764                        askpass: askpass.as_deref(),
765                        bw_session: bw_session.as_deref(),
766                        has_tunnel,
767                    };
768                    let guard = execute_host(run_id, &host_ctx, &command, &tx);
769
770                    if let Some(g) = guard {
771                        guards.lock().unwrap_or_else(|e| e.into_inner()).push(g);
772                    }
773
774                    let _ = tx.send(crate::event::AppEvent::SnippetProgress {
775                        run_id,
776                        completed: i + 1,
777                        total,
778                    });
779                }
780            }
781
782            let _ = tx.send(crate::event::AppEvent::SnippetAllDone { run_id });
783            // Guards dropped here, cleaning up any remaining children
784        })
785        .expect("failed to spawn snippet coordinator");
786}
787
788/// Run a snippet on a single host via SSH.
789/// When `capture` is true, stdout/stderr are piped and returned in the result.
790/// When `capture` is false, stdout/stderr are inherited (streamed to terminal
791/// in real-time) and the returned strings are empty.
792pub fn run_snippet(
793    alias: &str,
794    config_path: &Path,
795    command: &str,
796    askpass: Option<&str>,
797    bw_session: Option<&str>,
798    capture: bool,
799    has_active_tunnel: bool,
800) -> anyhow::Result<SnippetResult> {
801    let mut cmd = base_ssh_command(
802        alias,
803        config_path,
804        command,
805        askpass,
806        bw_session,
807        has_active_tunnel,
808        capture,
809    );
810
811    if capture {
812        cmd.stdin(Stdio::null())
813            .stdout(Stdio::piped())
814            .stderr(Stdio::piped());
815    } else {
816        cmd.stdin(Stdio::inherit())
817            .stdout(Stdio::inherit())
818            .stderr(Stdio::inherit());
819    }
820
821    if capture {
822        let output = cmd
823            .output()
824            .map_err(|e| anyhow::anyhow!("Failed to run ssh for '{}': {}", alias, e))?;
825
826        Ok(SnippetResult {
827            status: output.status,
828            stdout: String::from_utf8_lossy(&output.stdout).to_string(),
829            stderr: String::from_utf8_lossy(&output.stderr).to_string(),
830        })
831    } else {
832        let status = cmd
833            .status()
834            .map_err(|e| anyhow::anyhow!("Failed to run ssh for '{}': {}", alias, e))?;
835
836        Ok(SnippetResult {
837            status,
838            stdout: String::new(),
839            stderr: String::new(),
840        })
841    }
842}
843
844#[cfg(test)]
845#[path = "snippet_tests.rs"]
846mod tests;