Skip to main content

fresh/services/remote/
spawner.rs

1//! Process spawner abstraction
2//!
3//! Provides a trait for spawning processes that works transparently on both
4//! local and remote hosts. Used by the Editor's SpawnProcess handler (for
5//! plugins like git_grep) and by FileProvider (for `git ls-files`).
6//!
7//! Two orthogonal traits live here:
8//!
9//! - [`ProcessSpawner`] — one-shot "run and collect" commands. Callers get
10//!   `{stdout, stderr, exit_code}` back once the child exits. Used by
11//!   plugin `spawnProcess`, find-in-files, `git ls-files`, etc.
12//! - [`LongRunningSpawner`] — long-lived stdio processes (LSP servers,
13//!   future tool agents). Callers get a [`StdioChild`] they can talk to
14//!   via piped stdin/stdout/stderr and kill explicitly. LSP servers route
15//!   through this so an authority pointing at a container runs the server
16//!   inside the container (via `docker exec -i`) instead of on the host.
17
18use crate::services::process_hidden::HideWindow;
19use crate::services::process_limits::PostSpawnAction;
20use crate::services::remote::channel::{AgentChannel, ChannelError};
21use crate::services::remote::protocol::{decode_base64, exec_params};
22use crate::types::ProcessLimits;
23use std::path::Path;
24use std::process::ExitStatus;
25use std::sync::Arc;
26use tokio::process::{ChildStderr, ChildStdin, ChildStdout};
27
28/// Result of spawning a process
29#[derive(Debug, Clone)]
30pub struct SpawnResult {
31    pub stdout: String,
32    pub stderr: String,
33    pub exit_code: i32,
34}
35
36/// Error from spawning a process
37#[derive(Debug, thiserror::Error)]
38pub enum SpawnError {
39    #[error("Channel error: {0}")]
40    Channel(#[from] ChannelError),
41
42    #[error("Process error: {0}")]
43    Process(String),
44
45    #[error("Decode error: {0}")]
46    Decode(String),
47}
48
49/// Trait for spawning processes (local or remote)
50///
51/// This abstraction allows plugins and core features (like file discovery)
52/// to spawn processes transparently on either local or remote filesystems.
53#[async_trait::async_trait]
54pub trait ProcessSpawner: Send + Sync {
55    /// Spawn a process and wait for completion
56    async fn spawn(
57        &self,
58        command: String,
59        args: Vec<String>,
60        cwd: Option<String>,
61    ) -> Result<SpawnResult, SpawnError>;
62
63    /// Spawn a process, piping stdout directly to a file instead of
64    /// buffering it in memory. Default impl buffers and writes; concrete
65    /// implementations should override when a streaming path exists.
66    ///
67    /// `SpawnResult.stdout` is empty on success — the bytes are on disk
68    /// at `stdout_to` instead. `stderr` and `exit_code` work as usual.
69    async fn spawn_to_file(
70        &self,
71        command: String,
72        args: Vec<String>,
73        cwd: Option<String>,
74        stdout_to: std::path::PathBuf,
75    ) -> Result<SpawnResult, SpawnError> {
76        // Fallback: collect in memory then write. Concrete impls override
77        // to pipe directly.
78        let result = self.spawn(command, args, cwd).await?;
79        if result.exit_code == 0 || !result.stdout.is_empty() {
80            std::fs::write(&stdout_to, result.stdout.as_bytes())
81                .map_err(|e| SpawnError::Process(format!("write {:?}: {}", stdout_to, e)))?;
82        }
83        Ok(SpawnResult {
84            stdout: String::new(),
85            stderr: result.stderr,
86            exit_code: result.exit_code,
87        })
88    }
89
90    /// Spawn a process that can be cancelled mid-flight via a oneshot
91    /// receiver. When `stdout_to` is `Some`, stdout streams to the file;
92    /// when `None`, it's buffered into `SpawnResult.stdout`.
93    ///
94    /// If `kill_rx` fires before the child exits, the child is killed and
95    /// the result reflects the killed exit status.
96    ///
97    /// Default impl ignores `kill_rx` (no true cancellation for backends
98    /// that buffer in memory). Local override implements real kill.
99    async fn spawn_cancellable(
100        &self,
101        command: String,
102        args: Vec<String>,
103        cwd: Option<String>,
104        stdout_to: Option<std::path::PathBuf>,
105        _kill_rx: tokio::sync::oneshot::Receiver<()>,
106    ) -> Result<SpawnResult, SpawnError> {
107        match stdout_to {
108            Some(p) => self.spawn_to_file(command, args, cwd, p).await,
109            None => self.spawn(command, args, cwd).await,
110        }
111    }
112}
113
114/// Local process spawner using tokio
115///
116/// Used for local file editing (the default).
117pub struct LocalProcessSpawner;
118
119#[async_trait::async_trait]
120impl ProcessSpawner for LocalProcessSpawner {
121    async fn spawn(
122        &self,
123        command: String,
124        args: Vec<String>,
125        cwd: Option<String>,
126    ) -> Result<SpawnResult, SpawnError> {
127        let mut cmd = tokio::process::Command::new(&command);
128        cmd.args(&args);
129        cmd.hide_window();
130
131        if let Some(ref dir) = cwd {
132            cmd.current_dir(dir);
133        }
134
135        let output = cmd
136            .output()
137            .await
138            .map_err(|e| SpawnError::Process(e.to_string()))?;
139
140        Ok(SpawnResult {
141            stdout: String::from_utf8_lossy(&output.stdout).to_string(),
142            stderr: String::from_utf8_lossy(&output.stderr).to_string(),
143            exit_code: output.status.code().unwrap_or(-1),
144        })
145    }
146
147    /// Cancellable streaming spawn. Handles both `stdout_to = Some(path)`
148    /// (pipe stdout to file) and `stdout_to = None` (buffer in memory),
149    /// with kill support via `kill_rx`.
150    async fn spawn_cancellable(
151        &self,
152        command: String,
153        args: Vec<String>,
154        cwd: Option<String>,
155        stdout_to: Option<std::path::PathBuf>,
156        kill_rx: tokio::sync::oneshot::Receiver<()>,
157    ) -> Result<SpawnResult, SpawnError> {
158        use std::process::Stdio;
159        use tokio::io::AsyncReadExt;
160
161        let mut cmd = tokio::process::Command::new(&command);
162        cmd.args(&args);
163        cmd.hide_window();
164        cmd.stdout(Stdio::piped());
165        cmd.stderr(Stdio::piped());
166        if let Some(ref dir) = cwd {
167            cmd.current_dir(dir);
168        }
169
170        // For file-output mode, ensure parent dir exists. Surface the
171        // failure as a SpawnError rather than silently dropping — if we
172        // can't make the dir, the File::create below would just fail
173        // with a less informative error.
174        if let Some(ref path) = stdout_to {
175            if let Some(parent) = path.parent() {
176                if !parent.as_os_str().is_empty() {
177                    tokio::fs::create_dir_all(parent).await.map_err(|e| {
178                        SpawnError::Process(format!("create_dir_all {:?}: {}", parent, e))
179                    })?;
180                }
181            }
182        }
183
184        let mut child = cmd
185            .spawn()
186            .map_err(|e| SpawnError::Process(e.to_string()))?;
187
188        let mut child_stdout = child
189            .stdout
190            .take()
191            .ok_or_else(|| SpawnError::Process("child stdout missing".to_string()))?;
192        let mut child_stderr = child
193            .stderr
194            .take()
195            .ok_or_else(|| SpawnError::Process("child stderr missing".to_string()))?;
196
197        // Drain stdout (to file or buffer) and stderr concurrently —
198        // both must be drained or the child can stall on a full pipe.
199        let stdout_task: tokio::task::JoinHandle<std::io::Result<Vec<u8>>> = match stdout_to {
200            Some(path) => tokio::spawn(async move {
201                let mut file = tokio::fs::File::create(&path).await?;
202                tokio::io::copy(&mut child_stdout, &mut file).await?;
203                use tokio::io::AsyncWriteExt;
204                // flush + sync are best-effort durability so a reader
205                // opening the file right after spawn resolves sees all
206                // bytes. The actual write happened in `copy` above; a
207                // flush error here only loses the durability hint.
208                if let Err(e) = file.flush().await {
209                    tracing::warn!("spawn_cancellable: file flush failed: {}", e);
210                }
211                if let Err(e) = file.sync_all().await {
212                    tracing::warn!("spawn_cancellable: file sync_all failed: {}", e);
213                }
214                Ok(Vec::new())
215            }),
216            None => tokio::spawn(async move {
217                let mut buf = Vec::new();
218                child_stdout.read_to_end(&mut buf).await?;
219                Ok(buf)
220            }),
221        };
222        let stderr_task: tokio::task::JoinHandle<std::io::Result<Vec<u8>>> =
223            tokio::spawn(async move {
224                let mut buf = Vec::new();
225                child_stderr.read_to_end(&mut buf).await?;
226                Ok(buf)
227            });
228
229        // Race child.wait() against kill_rx so the dispatcher can kill
230        // mid-stream (e.g. user scrolled past the commit before git
231        // finished).
232        let exit_code = tokio::select! {
233            status = child.wait() => status
234                .map(|s| s.code().unwrap_or(-1))
235                .unwrap_or(-1),
236            _ = kill_rx => {
237                // start_kill fails only when the process has already
238                // exited — and we're about to `wait()` to reap either
239                // way, so the failure path collapses with the success
240                // path. Log at debug for diagnostic visibility.
241                if let Err(e) = child.start_kill() {
242                    tracing::debug!("spawn_cancellable: start_kill (already exited?): {}", e);
243                }
244                child.wait().await.map(|s| s.code().unwrap_or(-1)).unwrap_or(-1)
245            }
246        };
247
248        // Both drain tasks must finish; on kill they get EOF when the
249        // child's pipes close.
250        let stdout_bytes = stdout_task
251            .await
252            .map_err(|e| SpawnError::Process(format!("stdout task: {}", e)))?
253            .map_err(|e| SpawnError::Process(format!("stdout drain: {}", e)))?;
254        let stderr_bytes = stderr_task
255            .await
256            .map_err(|e| SpawnError::Process(format!("stderr task: {}", e)))?
257            .map_err(|e| SpawnError::Process(format!("stderr drain: {}", e)))?;
258
259        Ok(SpawnResult {
260            stdout: String::from_utf8_lossy(&stdout_bytes).to_string(),
261            stderr: String::from_utf8_lossy(&stderr_bytes).to_string(),
262            exit_code,
263        })
264    }
265
266    /// Streaming override: pipe child stdout straight into `stdout_to`
267    /// via `tokio::io::copy`. The 43 MB stdout of `git show` for the
268    /// bun-rust-rewrite commit never lands in a single `String`.
269    async fn spawn_to_file(
270        &self,
271        command: String,
272        args: Vec<String>,
273        cwd: Option<String>,
274        stdout_to: std::path::PathBuf,
275    ) -> Result<SpawnResult, SpawnError> {
276        use std::process::Stdio;
277        use tokio::io::AsyncWriteExt;
278
279        let mut cmd = tokio::process::Command::new(&command);
280        cmd.args(&args);
281        cmd.hide_window();
282        cmd.stdout(Stdio::piped());
283        cmd.stderr(Stdio::piped());
284        if let Some(ref dir) = cwd {
285            cmd.current_dir(dir);
286        }
287
288        // Ensure the parent dir exists so the open below doesn't ENOENT.
289        // Surface failures rather than letting File::create error with a
290        // less informative message.
291        if let Some(parent) = stdout_to.parent() {
292            if !parent.as_os_str().is_empty() {
293                tokio::fs::create_dir_all(parent).await.map_err(|e| {
294                    SpawnError::Process(format!("create_dir_all {:?}: {}", parent, e))
295                })?;
296            }
297        }
298
299        let mut file = tokio::fs::File::create(&stdout_to)
300            .await
301            .map_err(|e| SpawnError::Process(format!("create {:?}: {}", stdout_to, e)))?;
302
303        let mut child = cmd
304            .spawn()
305            .map_err(|e| SpawnError::Process(e.to_string()))?;
306
307        let mut child_stdout = child
308            .stdout
309            .take()
310            .ok_or_else(|| SpawnError::Process("child stdout missing".to_string()))?;
311        let mut child_stderr = child
312            .stderr
313            .take()
314            .ok_or_else(|| SpawnError::Process("child stderr missing".to_string()))?;
315
316        // Copy stdout to file and drain stderr concurrently. Both ends
317        // must be drained or the child can stall on a full pipe.
318        let stdout_task = tokio::spawn(async move {
319            let res = tokio::io::copy(&mut child_stdout, &mut file).await;
320            // flush + sync are best-effort durability so a reader
321            // opening the file right after spawn resolves sees all
322            // bytes. The data was already written in `copy` above; a
323            // flush error here only loses the durability hint.
324            if let Err(e) = file.flush().await {
325                tracing::warn!("spawn_to_file: file flush failed: {}", e);
326            }
327            if let Err(e) = file.sync_all().await {
328                tracing::warn!("spawn_to_file: file sync_all failed: {}", e);
329            }
330            res
331        });
332        let stderr_task = tokio::spawn(async move {
333            let mut buf = Vec::new();
334            let res = tokio::io::copy(&mut child_stderr, &mut buf).await;
335            res.map(|_| buf)
336        });
337
338        let status = child
339            .wait()
340            .await
341            .map_err(|e| SpawnError::Process(format!("wait: {}", e)))?;
342
343        // Drop the empty Vec from the streaming task — its only signal
344        // is success/failure of the io::copy and flush, propagated via
345        // the `?` operator.
346        stdout_task
347            .await
348            .map_err(|e| SpawnError::Process(format!("stdout task: {}", e)))?
349            .map_err(|e| SpawnError::Process(format!("stdout copy: {}", e)))?;
350        let stderr_bytes = stderr_task
351            .await
352            .map_err(|e| SpawnError::Process(format!("stderr task: {}", e)))?
353            .map_err(|e| SpawnError::Process(format!("stderr drain: {}", e)))?;
354
355        Ok(SpawnResult {
356            stdout: String::new(),
357            stderr: String::from_utf8_lossy(&stderr_bytes).to_string(),
358            exit_code: status.code().unwrap_or(-1),
359        })
360    }
361}
362
363/// Remote process spawner via SSH agent
364pub struct RemoteProcessSpawner {
365    channel: Arc<AgentChannel>,
366}
367
368impl RemoteProcessSpawner {
369    /// Create a new remote process spawner
370    pub fn new(channel: Arc<AgentChannel>) -> Self {
371        Self { channel }
372    }
373}
374
375#[async_trait::async_trait]
376impl ProcessSpawner for RemoteProcessSpawner {
377    async fn spawn(
378        &self,
379        command: String,
380        args: Vec<String>,
381        cwd: Option<String>,
382    ) -> Result<SpawnResult, SpawnError> {
383        let params = exec_params(&command, &args, cwd.as_deref());
384
385        // Use streaming request to get live output
386        let (mut data_rx, result_rx) = self.channel.request_streaming("exec", params).await?;
387
388        let mut stdout = Vec::new();
389        let mut stderr = Vec::new();
390
391        // Collect streaming output
392        while let Some(data) = data_rx.recv().await {
393            if let Some(out) = data.get("out").and_then(|v| v.as_str()) {
394                if let Ok(decoded) = decode_base64(out) {
395                    stdout.extend_from_slice(&decoded);
396                }
397            }
398            if let Some(err) = data.get("err").and_then(|v| v.as_str()) {
399                if let Ok(decoded) = decode_base64(err) {
400                    stderr.extend_from_slice(&decoded);
401                }
402            }
403        }
404
405        // Get final result
406        let result = result_rx
407            .await
408            .map_err(|_| SpawnError::Channel(ChannelError::ChannelClosed))?
409            .map_err(SpawnError::Process)?;
410
411        let exit_code = result
412            .get("code")
413            .and_then(|v| v.as_i64())
414            .map(|c| c as i32)
415            .unwrap_or(-1);
416
417        Ok(SpawnResult {
418            stdout: String::from_utf8_lossy(&stdout).to_string(),
419            stderr: String::from_utf8_lossy(&stderr).to_string(),
420            exit_code,
421        })
422    }
423
424    async fn spawn_to_file(
425        &self,
426        _command: String,
427        _args: Vec<String>,
428        _cwd: Option<String>,
429        _stdout_to: std::path::PathBuf,
430    ) -> Result<SpawnResult, SpawnError> {
431        Err(SpawnError::Process(
432            "stdoutTo is not supported for remote processes".to_string(),
433        ))
434    }
435}
436
437/// A long-lived child process with piped stdio streams.
438///
439/// Wraps [`tokio::process::Child`] so the LSP code (and future callers
440/// like plugin-managed tool agents) doesn't reach into concrete process
441/// types — that way a container authority can transparently run the
442/// child through `docker exec -i` while the caller keeps talking to an
443/// ordinary stdin/stdout pair.
444///
445/// Streams are `Option`-wrapped so callers can [`Self::take_stdin`] /
446/// [`Self::take_stdout`] / [`Self::take_stderr`] into their own reader
447/// and writer tasks. After all streams are taken, the `StdioChild` is
448/// still useful for lifecycle control via [`Self::kill`] and
449/// [`Self::wait`].
450///
451/// `spawned_locally` tells callers whether `id()` names the real child
452/// process (true for local spawns) or an intermediate like `docker` /
453/// `ssh` (false). LSP's cgroup-attachment step keys off this — applying
454/// a cgroup to the `docker` CLI PID doesn't constrain the container-
455/// side server it exec'd.
456pub struct StdioChild {
457    inner: tokio::process::Child,
458    stdin: Option<ChildStdin>,
459    stdout: Option<ChildStdout>,
460    stderr: Option<ChildStderr>,
461    spawned_locally: bool,
462}
463
464impl StdioChild {
465    /// Construct a `StdioChild` from an already-spawned
466    /// `tokio::process::Child`. Pulls the piped streams out of the
467    /// child so callers can take them individually later.
468    ///
469    /// This constructor is for spawners that don't participate in
470    /// host-side resource limiting (the Docker variant is the
471    /// canonical example). Local spawners should prefer
472    /// [`Self::from_local_tokio_child`] so a `PostSpawnAction` produced
473    /// by [`ProcessLimits::apply_to_command`] is applied to the child's
474    /// PID before the spawner returns.
475    pub fn from_tokio_child(mut child: tokio::process::Child, spawned_locally: bool) -> Self {
476        let stdin = child.stdin.take();
477        let stdout = child.stdout.take();
478        let stderr = child.stderr.take();
479        Self {
480            inner: child,
481            stdin,
482            stdout,
483            stderr,
484            spawned_locally,
485        }
486    }
487
488    /// Construct a `StdioChild` for a locally-spawned child while
489    /// applying any host-side `PostSpawnAction` (cgroup attachment)
490    /// returned by [`ProcessLimits::apply_to_command`]. Best-effort:
491    /// failure to attach logs a warning but doesn't fail the spawn,
492    /// matching the pre-refactor behavior.
493    pub fn from_local_tokio_child(
494        child: tokio::process::Child,
495        post_spawn: PostSpawnAction,
496    ) -> Self {
497        let out = Self::from_tokio_child(child, true);
498        if let Some(pid) = out.inner.id() {
499            post_spawn.apply_to_child(pid);
500        }
501        out
502    }
503
504    /// Take the stdin stream. Returns `None` after the first call.
505    pub fn take_stdin(&mut self) -> Option<ChildStdin> {
506        self.stdin.take()
507    }
508
509    /// Take the stdout stream. Returns `None` after the first call.
510    pub fn take_stdout(&mut self) -> Option<ChildStdout> {
511        self.stdout.take()
512    }
513
514    /// Take the stderr stream. Returns `None` after the first call.
515    pub fn take_stderr(&mut self) -> Option<ChildStderr> {
516        self.stderr.take()
517    }
518
519    /// PID of the immediate child process. For local spawns this is
520    /// the LSP server itself; for docker/ssh this is the CLI wrapper.
521    /// Use [`Self::spawned_locally`] to tell which.
522    pub fn id(&self) -> Option<u32> {
523        self.inner.id()
524    }
525
526    /// `true` when the child PID names the real target process. Callers
527    /// that only apply host-side resource controls (cgroups, rlimits)
528    /// should skip their application when this is `false`.
529    pub fn spawned_locally(&self) -> bool {
530        self.spawned_locally
531    }
532
533    /// Request termination. Forwards to [`tokio::process::Child::kill`].
534    pub async fn kill(&mut self) -> std::io::Result<()> {
535        self.inner.kill().await
536    }
537
538    /// Await exit. Forwards to [`tokio::process::Child::wait`].
539    pub async fn wait(&mut self) -> std::io::Result<ExitStatus> {
540        self.inner.wait().await
541    }
542}
543
544/// Spawner for long-lived stdio processes (LSP servers, tool agents).
545///
546/// Separate from [`ProcessSpawner`] because the APIs diverge in two
547/// ways that don't compose: [`ProcessSpawner::spawn`] awaits
548/// completion and returns collected output; callers of
549/// `LongRunningSpawner` need a live child they can read from and
550/// write to over time.
551///
552/// Authorities expose one of these alongside their filesystem and
553/// one-shot spawner. Routing LSP spawning through it is what gives
554/// container authorities in-container LSP without a special-cased
555/// branch in `LspHandle`.
556///
557/// Callers pass an optional [`ProcessLimits`] block so local spawners
558/// can honor host-side memory / CPU limits. Non-local variants (docker,
559/// ssh) don't have a meaningful way to impose host limits on their
560/// child — cgroups attached to the `docker` CLI PID don't reach into
561/// the container — and are expected to ignore them.
562#[async_trait::async_trait]
563pub trait LongRunningSpawner: Send + Sync {
564    /// Spawn `command` with `args` as a long-lived stdio child under
565    /// this authority. Stdin/stdout/stderr are piped so the caller can
566    /// hand them to dedicated reader/writer tasks. `limits`, when
567    /// provided, lets local spawners attach cgroups or `setrlimit`;
568    /// remote spawners are expected to ignore it (see trait docs).
569    async fn spawn_stdio(
570        &self,
571        command: &str,
572        args: &[String],
573        env: Vec<(String, String)>,
574        cwd: Option<&Path>,
575        limits: Option<&ProcessLimits>,
576    ) -> Result<StdioChild, SpawnError>;
577
578    /// Check whether `command` resolves to an executable under this
579    /// authority. Routed through the same spawner so an SSH authority
580    /// probes the remote `$PATH` and a container authority probes the
581    /// container's `$PATH` — unlike `which::which` which only ever sees
582    /// the host.
583    async fn command_exists(&self, command: &str) -> bool;
584}
585
586/// Local long-running spawner using `tokio::process::Command` directly.
587///
588/// Functionally equivalent to how `LspHandle::spawn` works today, but
589/// exposed through the trait so non-local authorities can substitute
590/// their own implementation without any LSP-side awareness. Applies
591/// any `ProcessLimits` passed in via the same machinery the
592/// pre-refactor LSP code used (`apply_to_command` + `apply_to_child`).
593pub struct LocalLongRunningSpawner;
594
595#[async_trait::async_trait]
596impl LongRunningSpawner for LocalLongRunningSpawner {
597    async fn spawn_stdio(
598        &self,
599        command: &str,
600        args: &[String],
601        env: Vec<(String, String)>,
602        cwd: Option<&Path>,
603        limits: Option<&ProcessLimits>,
604    ) -> Result<StdioChild, SpawnError> {
605        let mut cmd = tokio::process::Command::new(command);
606        cmd.args(args)
607            .envs(env)
608            .stdin(std::process::Stdio::piped())
609            .stdout(std::process::Stdio::piped())
610            .stderr(std::process::Stdio::piped())
611            .hide_window()
612            .kill_on_drop(true);
613        if let Some(dir) = cwd {
614            cmd.current_dir(dir);
615        }
616
617        // Apply pre-spawn hooks (cgroup path selection, setrlimit
618        // via `pre_exec`). Errors bubble up so callers see
619        // configuration problems early — matches the pre-refactor
620        // LSP behavior.
621        let post_spawn = match limits {
622            Some(lim) => lim
623                .apply_to_command(&mut cmd)
624                .map_err(|e| SpawnError::Process(format!("Failed to apply process limits: {e}")))?,
625            None => PostSpawnAction::default(),
626        };
627
628        let child = cmd
629            .spawn()
630            .map_err(|e| SpawnError::Process(e.to_string()))?;
631        Ok(StdioChild::from_local_tokio_child(child, post_spawn))
632    }
633
634    async fn command_exists(&self, command: &str) -> bool {
635        which::which(command).is_ok()
636    }
637}
638
639#[cfg(test)]
640mod tests {
641    use super::*;
642    use tokio::io::AsyncReadExt;
643
644    #[tokio::test]
645    async fn test_local_spawner() {
646        let spawner = LocalProcessSpawner;
647        let result = spawner
648            .spawn("echo".to_string(), vec!["hello".to_string()], None)
649            .await
650            .unwrap();
651
652        assert_eq!(result.exit_code, 0);
653        assert!(result.stdout.trim() == "hello");
654    }
655
656    #[tokio::test]
657    async fn test_local_spawner_stdout_to_file() {
658        let spawner = LocalProcessSpawner;
659        let tmp =
660            std::env::temp_dir().join(format!("fresh-spawner-test-{}.out", std::process::id()));
661        // Best-effort cleanup of any leftover from a previous run.
662        // Failure (e.g. NotFound) is fine — the spawn below will
663        // create the file fresh.
664        #[allow(clippy::let_underscore_must_use)]
665        let _ = std::fs::remove_file(&tmp);
666        let result = spawner
667            .spawn_to_file(
668                "echo".to_string(),
669                vec!["hello-from-disk".to_string()],
670                None,
671                tmp.clone(),
672            )
673            .await
674            .unwrap();
675
676        assert_eq!(result.exit_code, 0);
677        assert!(
678            result.stdout.is_empty(),
679            "stdout should be empty when streaming"
680        );
681        let contents = std::fs::read_to_string(&tmp).expect("output file should exist");
682        assert_eq!(contents.trim(), "hello-from-disk");
683        // Best-effort cleanup — leaving a temp file behind on
684        // failure is acceptable and the next run's pre-cleanup
685        // handles it.
686        #[allow(clippy::let_underscore_must_use)]
687        let _ = std::fs::remove_file(&tmp);
688    }
689
690    #[tokio::test]
691    async fn test_local_spawner_cancellable_kill() {
692        let spawner = LocalProcessSpawner;
693        let (kill_tx, kill_rx) = tokio::sync::oneshot::channel::<()>();
694
695        // Start a sleep that would take 30s normally; fire kill after 100ms.
696        let task = tokio::spawn(async move {
697            spawner
698                .spawn_cancellable(
699                    "sleep".to_string(),
700                    vec!["30".to_string()],
701                    None,
702                    None,
703                    kill_rx,
704                )
705                .await
706        });
707
708        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
709        // Fire the kill. Err means the receiver was dropped (task
710        // already finished), which would mean the 30s sleep returned
711        // promptly on its own — impossible in this test window, but
712        // not worth a panic either way; the subsequent task.await
713        // surfaces any real problem.
714        #[allow(clippy::let_underscore_must_use)]
715        let _ = kill_tx.send(());
716
717        let start = std::time::Instant::now();
718        let result = task.await.unwrap().unwrap();
719        let elapsed = start.elapsed();
720
721        // SIGKILL'd sleep on Unix returns exit_code 137 or -1 (no code).
722        // The point is we returned promptly, not after 30s.
723        assert!(
724            elapsed < std::time::Duration::from_secs(5),
725            "kill should be prompt, took {:?}",
726            elapsed
727        );
728        assert_ne!(result.exit_code, 0, "killed process shouldn't be exit 0");
729    }
730
731    #[tokio::test]
732    async fn local_long_running_spawn_stdio_pipes_output() {
733        let spawner = LocalLongRunningSpawner;
734        let mut child = spawner
735            .spawn_stdio(
736                "sh",
737                &["-c".into(), "echo hi".into()],
738                Vec::new(),
739                None,
740                None,
741            )
742            .await
743            .expect("spawn succeeds");
744
745        let mut stdout = child.take_stdout().expect("stdout piped");
746        let mut buf = String::new();
747        stdout.read_to_string(&mut buf).await.unwrap();
748        assert_eq!(buf.trim(), "hi");
749
750        let status = child.wait().await.unwrap();
751        assert!(status.success());
752        assert!(child.spawned_locally());
753    }
754
755    #[tokio::test]
756    async fn local_long_running_command_exists_for_sh() {
757        let spawner = LocalLongRunningSpawner;
758        assert!(spawner.command_exists("sh").await);
759        assert!(
760            !spawner
761                .command_exists("fresh-unlikely-binary-name-ygzu9")
762                .await
763        );
764    }
765}