Skip to main content

fresh/services/remote/
spawner.rs

1//! Process spawner abstraction
2//!
3//! Provides a trait for spawning processes that works transparently on both
4//! local and remote hosts. Used by the Editor's SpawnProcess handler (for
5//! plugins like git_grep) and by FileProvider (for `git ls-files`).
6//!
7//! Two orthogonal traits live here:
8//!
9//! - [`ProcessSpawner`] — one-shot "run and collect" commands. Callers get
10//!   `{stdout, stderr, exit_code}` back once the child exits. Used by
11//!   plugin `spawnProcess`, find-in-files, `git ls-files`, etc.
12//! - [`LongRunningSpawner`] — long-lived stdio processes (LSP servers,
13//!   future tool agents). Callers get a [`StdioChild`] they can talk to
14//!   via piped stdin/stdout/stderr and kill explicitly. LSP servers route
15//!   through this so an authority pointing at a container runs the server
16//!   inside the container (via `docker exec -i`) instead of on the host.
17
18use crate::services::env_provider::EnvProvider;
19use crate::services::process_hidden::HideWindow;
20use crate::services::process_limits::PostSpawnAction;
21use crate::services::remote::channel::{AgentChannel, ChannelError};
22use crate::services::remote::protocol::{decode_base64, exec_params};
23use crate::services::workspace_trust::{gate, WorkspaceTrust};
24use crate::types::ProcessLimits;
25
26/// Capture the active environment for a *local* host: run the provider's
27/// capture script through `$SHELL -lc …` as a raw subprocess (no env applied —
28/// this is how the env is established, so it must not recurse). Returns the
29/// captured `KEY=VALUE` pairs, or empty when inactive / on failure.
30async fn local_captured_env(provider: &EnvProvider) -> Vec<(String, String)> {
31    provider
32        .current(|script| async move {
33            let shell = std::env::var("SHELL").unwrap_or_else(|_| "/bin/sh".to_string());
34            let output = tokio::process::Command::new(&shell)
35                .arg("-lc")
36                .arg(&script)
37                .hide_window()
38                .output()
39                .await
40                .ok()?;
41            Some(String::from_utf8_lossy(&output.stdout).into_owned())
42        })
43        .await
44}
45use std::borrow::Cow;
46use std::path::Path;
47use std::process::ExitStatus;
48use std::sync::Arc;
49use tokio::process::{ChildStderr, ChildStdin, ChildStdout};
50
51/// Resolve a program name to a spawnable form for a *local* child.
52///
53/// On **Windows** this is the fix for the class of "installed but `program
54/// not found`" failures (e.g. issue #2324, `typescript-language-server`).
55/// Node-based CLIs install under `npm -g` as `.cmd`/`.bat` shims, but the
56/// `CreateProcess` path that `std`/`tokio` use only auto-appends `.exe`
57/// when handed a bare name — it does not walk `PATHEXT`. So spawning
58/// `typescript-language-server` fails even though our existence check
59/// ([`crate::services::lsp::command_exists`]) finds the `.cmd` (it goes
60/// through `which`, which *does* honor `PATHEXT`). The two paths disagreed:
61/// the editor reported the server present, then the spawn failed.
62///
63/// Resolving the full path here — also via `which`, so the check and the
64/// spawn agree — hands `Command` a path ending in `.cmd`/`.bat`, which
65/// `std` (≥ 1.77.2) then runs through `cmd.exe` with the
66/// CVE-2024-24576-safe argument escaping. An unresolvable name falls back
67/// to the original string so the spawn still surfaces a meaningful error
68/// rather than masking a genuine typo.
69///
70/// On other platforms it is a pass-through: a plain `PATH` lookup already
71/// does the right thing and pre-resolving would differ only cosmetically.
72#[cfg(windows)]
73fn resolve_program(command: &str) -> Cow<'_, str> {
74    match which::which(command) {
75        Ok(path) => Cow::Owned(path.to_string_lossy().into_owned()),
76        Err(_) => Cow::Borrowed(command),
77    }
78}
79
80/// Non-Windows pass-through — see the Windows variant for rationale.
81#[cfg(not(windows))]
82fn resolve_program(command: &str) -> Cow<'_, str> {
83    Cow::Borrowed(command)
84}
85
86/// Result of spawning a process
87#[derive(Debug, Clone)]
88pub struct SpawnResult {
89    pub stdout: String,
90    pub stderr: String,
91    pub exit_code: i32,
92}
93
94/// Error from spawning a process
95#[derive(Debug, thiserror::Error)]
96pub enum SpawnError {
97    #[error("Channel error: {0}")]
98    Channel(#[from] ChannelError),
99
100    #[error("Process error: {0}")]
101    Process(String),
102
103    #[error("Decode error: {0}")]
104    Decode(String),
105}
106
107/// Trait for spawning processes (local or remote)
108///
109/// This abstraction allows plugins and core features (like file discovery)
110/// to spawn processes transparently on either local or remote filesystems.
111#[async_trait::async_trait]
112pub trait ProcessSpawner: Send + Sync {
113    /// Spawn a process and wait for completion
114    async fn spawn(
115        &self,
116        command: String,
117        args: Vec<String>,
118        cwd: Option<String>,
119    ) -> Result<SpawnResult, SpawnError>;
120
121    /// Spawn a process, piping stdout directly to a file instead of
122    /// buffering it in memory. Default impl buffers and writes; concrete
123    /// implementations should override when a streaming path exists.
124    ///
125    /// `SpawnResult.stdout` is empty on success — the bytes are on disk
126    /// at `stdout_to` instead. `stderr` and `exit_code` work as usual.
127    async fn spawn_to_file(
128        &self,
129        command: String,
130        args: Vec<String>,
131        cwd: Option<String>,
132        stdout_to: std::path::PathBuf,
133    ) -> Result<SpawnResult, SpawnError> {
134        // Fallback: collect in memory then write. Concrete impls override
135        // to pipe directly.
136        let result = self.spawn(command, args, cwd).await?;
137        if result.exit_code == 0 || !result.stdout.is_empty() {
138            std::fs::write(&stdout_to, result.stdout.as_bytes())
139                .map_err(|e| SpawnError::Process(format!("write {:?}: {}", stdout_to, e)))?;
140        }
141        Ok(SpawnResult {
142            stdout: String::new(),
143            stderr: result.stderr,
144            exit_code: result.exit_code,
145        })
146    }
147
148    /// Spawn a process that can be cancelled mid-flight via a oneshot
149    /// receiver. When `stdout_to` is `Some`, stdout streams to the file;
150    /// when `None`, it's buffered into `SpawnResult.stdout`.
151    ///
152    /// If `kill_rx` fires before the child exits, the child is killed and
153    /// the result reflects the killed exit status.
154    ///
155    /// Default impl ignores `kill_rx` (no true cancellation for backends
156    /// that buffer in memory). Local override implements real kill.
157    async fn spawn_cancellable(
158        &self,
159        command: String,
160        args: Vec<String>,
161        cwd: Option<String>,
162        stdout_to: Option<std::path::PathBuf>,
163        _kill_rx: tokio::sync::oneshot::Receiver<()>,
164    ) -> Result<SpawnResult, SpawnError> {
165        match stdout_to {
166            Some(p) => self.spawn_to_file(command, args, cwd, p).await,
167            None => self.spawn(command, args, cwd).await,
168        }
169    }
170}
171
172/// Local process spawner using tokio.
173///
174/// Used for local file editing (the default). The optional `env` is layered
175/// onto every child's environment (under the inherited process env) — this is
176/// how an activated environment manager (venv / direnv / mise) injects its
177/// captured variables into one-shot spawns. Empty for the plain default.
178pub struct LocalProcessSpawner {
179    env: Arc<EnvProvider>,
180    trust: Arc<WorkspaceTrust>,
181}
182
183impl LocalProcessSpawner {
184    /// Local spawner gated by `trust`, applying the live `env` provider's
185    /// captured environment to every child.
186    pub fn new(env: Arc<EnvProvider>, trust: Arc<WorkspaceTrust>) -> Self {
187        Self { env, trust }
188    }
189
190    async fn apply_env(&self, cmd: &mut tokio::process::Command) {
191        let env = local_captured_env(&self.env).await;
192        if !env.is_empty() {
193            cmd.envs(env.iter().map(|(k, v)| (k.as_str(), v.as_str())));
194        }
195    }
196}
197
198#[async_trait::async_trait]
199impl ProcessSpawner for LocalProcessSpawner {
200    async fn spawn(
201        &self,
202        command: String,
203        args: Vec<String>,
204        cwd: Option<String>,
205    ) -> Result<SpawnResult, SpawnError> {
206        gate(&self.trust, &command, cwd.as_deref())?;
207        let mut cmd = tokio::process::Command::new(resolve_program(&command).as_ref());
208        cmd.args(&args);
209        self.apply_env(&mut cmd).await;
210        cmd.hide_window();
211
212        if let Some(ref dir) = cwd {
213            cmd.current_dir(dir);
214        }
215
216        let output = cmd
217            .output()
218            .await
219            .map_err(|e| SpawnError::Process(e.to_string()))?;
220
221        Ok(SpawnResult {
222            stdout: String::from_utf8_lossy(&output.stdout).to_string(),
223            stderr: String::from_utf8_lossy(&output.stderr).to_string(),
224            exit_code: output.status.code().unwrap_or(-1),
225        })
226    }
227
228    /// Cancellable streaming spawn. Handles both `stdout_to = Some(path)`
229    /// (pipe stdout to file) and `stdout_to = None` (buffer in memory),
230    /// with kill support via `kill_rx`.
231    async fn spawn_cancellable(
232        &self,
233        command: String,
234        args: Vec<String>,
235        cwd: Option<String>,
236        stdout_to: Option<std::path::PathBuf>,
237        kill_rx: tokio::sync::oneshot::Receiver<()>,
238    ) -> Result<SpawnResult, SpawnError> {
239        use std::process::Stdio;
240        use tokio::io::AsyncReadExt;
241
242        gate(&self.trust, &command, cwd.as_deref())?;
243        let mut cmd = tokio::process::Command::new(resolve_program(&command).as_ref());
244        cmd.args(&args);
245        self.apply_env(&mut cmd).await;
246        cmd.hide_window();
247        cmd.stdout(Stdio::piped());
248        cmd.stderr(Stdio::piped());
249        if let Some(ref dir) = cwd {
250            cmd.current_dir(dir);
251        }
252
253        // For file-output mode, ensure parent dir exists. Surface the
254        // failure as a SpawnError rather than silently dropping — if we
255        // can't make the dir, the File::create below would just fail
256        // with a less informative error.
257        if let Some(ref path) = stdout_to {
258            if let Some(parent) = path.parent() {
259                if !parent.as_os_str().is_empty() {
260                    tokio::fs::create_dir_all(parent).await.map_err(|e| {
261                        SpawnError::Process(format!("create_dir_all {:?}: {}", parent, e))
262                    })?;
263                }
264            }
265        }
266
267        let mut child = cmd
268            .spawn()
269            .map_err(|e| SpawnError::Process(e.to_string()))?;
270
271        let mut child_stdout = child
272            .stdout
273            .take()
274            .ok_or_else(|| SpawnError::Process("child stdout missing".to_string()))?;
275        let mut child_stderr = child
276            .stderr
277            .take()
278            .ok_or_else(|| SpawnError::Process("child stderr missing".to_string()))?;
279
280        // Drain stdout (to file or buffer) and stderr concurrently —
281        // both must be drained or the child can stall on a full pipe.
282        let stdout_task: tokio::task::JoinHandle<std::io::Result<Vec<u8>>> = match stdout_to {
283            Some(path) => tokio::spawn(async move {
284                let mut file = tokio::fs::File::create(&path).await?;
285                tokio::io::copy(&mut child_stdout, &mut file).await?;
286                use tokio::io::AsyncWriteExt;
287                // flush + sync are best-effort durability so a reader
288                // opening the file right after spawn resolves sees all
289                // bytes. The actual write happened in `copy` above; a
290                // flush error here only loses the durability hint.
291                if let Err(e) = file.flush().await {
292                    tracing::warn!("spawn_cancellable: file flush failed: {}", e);
293                }
294                if let Err(e) = file.sync_all().await {
295                    tracing::warn!("spawn_cancellable: file sync_all failed: {}", e);
296                }
297                Ok(Vec::new())
298            }),
299            None => tokio::spawn(async move {
300                let mut buf = Vec::new();
301                child_stdout.read_to_end(&mut buf).await?;
302                Ok(buf)
303            }),
304        };
305        let stderr_task: tokio::task::JoinHandle<std::io::Result<Vec<u8>>> =
306            tokio::spawn(async move {
307                let mut buf = Vec::new();
308                child_stderr.read_to_end(&mut buf).await?;
309                Ok(buf)
310            });
311
312        // Race child.wait() against kill_rx so the dispatcher can kill
313        // mid-stream (e.g. user scrolled past the commit before git
314        // finished).
315        let exit_code = tokio::select! {
316            status = child.wait() => status
317                .map(|s| s.code().unwrap_or(-1))
318                .unwrap_or(-1),
319            _ = kill_rx => {
320                // start_kill fails only when the process has already
321                // exited — and we're about to `wait()` to reap either
322                // way, so the failure path collapses with the success
323                // path. Log at debug for diagnostic visibility.
324                if let Err(e) = child.start_kill() {
325                    tracing::debug!("spawn_cancellable: start_kill (already exited?): {}", e);
326                }
327                child.wait().await.map(|s| s.code().unwrap_or(-1)).unwrap_or(-1)
328            }
329        };
330
331        // Both drain tasks must finish; on kill they get EOF when the
332        // child's pipes close.
333        let stdout_bytes = stdout_task
334            .await
335            .map_err(|e| SpawnError::Process(format!("stdout task: {}", e)))?
336            .map_err(|e| SpawnError::Process(format!("stdout drain: {}", e)))?;
337        let stderr_bytes = stderr_task
338            .await
339            .map_err(|e| SpawnError::Process(format!("stderr task: {}", e)))?
340            .map_err(|e| SpawnError::Process(format!("stderr drain: {}", e)))?;
341
342        Ok(SpawnResult {
343            stdout: String::from_utf8_lossy(&stdout_bytes).to_string(),
344            stderr: String::from_utf8_lossy(&stderr_bytes).to_string(),
345            exit_code,
346        })
347    }
348
349    /// Streaming override: pipe child stdout straight into `stdout_to`
350    /// via `tokio::io::copy`. The 43 MB stdout of `git show` for the
351    /// bun-rust-rewrite commit never lands in a single `String`.
352    async fn spawn_to_file(
353        &self,
354        command: String,
355        args: Vec<String>,
356        cwd: Option<String>,
357        stdout_to: std::path::PathBuf,
358    ) -> Result<SpawnResult, SpawnError> {
359        use std::process::Stdio;
360        use tokio::io::AsyncWriteExt;
361
362        gate(&self.trust, &command, cwd.as_deref())?;
363        let mut cmd = tokio::process::Command::new(resolve_program(&command).as_ref());
364        cmd.args(&args);
365        self.apply_env(&mut cmd).await;
366        cmd.hide_window();
367        cmd.stdout(Stdio::piped());
368        cmd.stderr(Stdio::piped());
369        if let Some(ref dir) = cwd {
370            cmd.current_dir(dir);
371        }
372
373        // Ensure the parent dir exists so the open below doesn't ENOENT.
374        // Surface failures rather than letting File::create error with a
375        // less informative message.
376        if let Some(parent) = stdout_to.parent() {
377            if !parent.as_os_str().is_empty() {
378                tokio::fs::create_dir_all(parent).await.map_err(|e| {
379                    SpawnError::Process(format!("create_dir_all {:?}: {}", parent, e))
380                })?;
381            }
382        }
383
384        let mut file = tokio::fs::File::create(&stdout_to)
385            .await
386            .map_err(|e| SpawnError::Process(format!("create {:?}: {}", stdout_to, e)))?;
387
388        let mut child = cmd
389            .spawn()
390            .map_err(|e| SpawnError::Process(e.to_string()))?;
391
392        let mut child_stdout = child
393            .stdout
394            .take()
395            .ok_or_else(|| SpawnError::Process("child stdout missing".to_string()))?;
396        let mut child_stderr = child
397            .stderr
398            .take()
399            .ok_or_else(|| SpawnError::Process("child stderr missing".to_string()))?;
400
401        // Copy stdout to file and drain stderr concurrently. Both ends
402        // must be drained or the child can stall on a full pipe.
403        let stdout_task = tokio::spawn(async move {
404            let res = tokio::io::copy(&mut child_stdout, &mut file).await;
405            // flush + sync are best-effort durability so a reader
406            // opening the file right after spawn resolves sees all
407            // bytes. The data was already written in `copy` above; a
408            // flush error here only loses the durability hint.
409            if let Err(e) = file.flush().await {
410                tracing::warn!("spawn_to_file: file flush failed: {}", e);
411            }
412            if let Err(e) = file.sync_all().await {
413                tracing::warn!("spawn_to_file: file sync_all failed: {}", e);
414            }
415            res
416        });
417        let stderr_task = tokio::spawn(async move {
418            let mut buf = Vec::new();
419            let res = tokio::io::copy(&mut child_stderr, &mut buf).await;
420            res.map(|_| buf)
421        });
422
423        let status = child
424            .wait()
425            .await
426            .map_err(|e| SpawnError::Process(format!("wait: {}", e)))?;
427
428        // Drop the empty Vec from the streaming task — its only signal
429        // is success/failure of the io::copy and flush, propagated via
430        // the `?` operator.
431        stdout_task
432            .await
433            .map_err(|e| SpawnError::Process(format!("stdout task: {}", e)))?
434            .map_err(|e| SpawnError::Process(format!("stdout copy: {}", e)))?;
435        let stderr_bytes = stderr_task
436            .await
437            .map_err(|e| SpawnError::Process(format!("stderr task: {}", e)))?
438            .map_err(|e| SpawnError::Process(format!("stderr drain: {}", e)))?;
439
440        Ok(SpawnResult {
441            stdout: String::new(),
442            stderr: String::from_utf8_lossy(&stderr_bytes).to_string(),
443            exit_code: status.code().unwrap_or(-1),
444        })
445    }
446}
447
448/// Wrap `(command, args)` so the captured `env` is applied on a backend that
449/// passes an argv array (SSH agent / docker) rather than a shell string:
450/// `env K=V … command args…`. Empty env ⇒ unchanged. (`command` must not
451/// contain `=`, which `env` would mistake for an assignment — true for any
452/// program name.)
453fn env_wrap(env: &[(String, String)], command: &str, args: &[String]) -> (String, Vec<String>) {
454    if env.is_empty() {
455        return (command.to_string(), args.to_vec());
456    }
457    let mut wrapped = Vec::with_capacity(env.len() + 1 + args.len());
458    for (k, v) in env {
459        wrapped.push(format!("{k}={v}"));
460    }
461    wrapped.push(command.to_string());
462    wrapped.extend(args.iter().cloned());
463    ("env".to_string(), wrapped)
464}
465
466/// Remote process spawner via SSH agent
467pub struct RemoteProcessSpawner {
468    channel: Arc<AgentChannel>,
469    env: Arc<EnvProvider>,
470    trust: Arc<WorkspaceTrust>,
471}
472
473impl RemoteProcessSpawner {
474    /// Create a new remote process spawner gated by `trust`, applying the live
475    /// `env` provider (captured on the remote host) to every spawn.
476    pub fn new(
477        channel: Arc<AgentChannel>,
478        env: Arc<EnvProvider>,
479        trust: Arc<WorkspaceTrust>,
480    ) -> Self {
481        Self {
482            channel,
483            env,
484            trust,
485        }
486    }
487
488    /// Capture the active env on the *remote* host by running the provider's
489    /// script through the agent's raw `exec` (no env applied — recursion-free).
490    async fn captured_env(&self) -> Vec<(String, String)> {
491        let channel = self.channel.clone();
492        self.env
493            .current(move |script| async move {
494                let params = exec_params("sh", &["-lc".to_string(), script], None);
495                let (mut data_rx, _result) =
496                    channel.request_streaming("exec", params).await.ok()?;
497                let mut stdout = Vec::new();
498                while let Some(d) = data_rx.recv().await {
499                    if let Some(out) = d.get("out").and_then(|v| v.as_str()) {
500                        if let Ok(b) = decode_base64(out) {
501                            stdout.extend_from_slice(&b);
502                        }
503                    }
504                }
505                Some(String::from_utf8_lossy(&stdout).into_owned())
506            })
507            .await
508    }
509}
510
511#[async_trait::async_trait]
512impl ProcessSpawner for RemoteProcessSpawner {
513    async fn spawn(
514        &self,
515        command: String,
516        args: Vec<String>,
517        cwd: Option<String>,
518    ) -> Result<SpawnResult, SpawnError> {
519        gate(&self.trust, &command, cwd.as_deref())?;
520        let captured = self.captured_env().await;
521        let (eff_cmd, eff_args) = env_wrap(&captured, &command, &args);
522        let params = exec_params(&eff_cmd, &eff_args, cwd.as_deref());
523
524        // Use streaming request to get live output
525        let (mut data_rx, result_rx) = self.channel.request_streaming("exec", params).await?;
526
527        let mut stdout = Vec::new();
528        let mut stderr = Vec::new();
529
530        // Collect streaming output
531        while let Some(data) = data_rx.recv().await {
532            if let Some(out) = data.get("out").and_then(|v| v.as_str()) {
533                if let Ok(decoded) = decode_base64(out) {
534                    stdout.extend_from_slice(&decoded);
535                }
536            }
537            if let Some(err) = data.get("err").and_then(|v| v.as_str()) {
538                if let Ok(decoded) = decode_base64(err) {
539                    stderr.extend_from_slice(&decoded);
540                }
541            }
542        }
543
544        // Get final result
545        let result = result_rx
546            .await
547            .map_err(|_| SpawnError::Channel(ChannelError::ChannelClosed))?
548            .map_err(SpawnError::Process)?;
549
550        let exit_code = result
551            .get("code")
552            .and_then(|v| v.as_i64())
553            .map(|c| c as i32)
554            .unwrap_or(-1);
555
556        Ok(SpawnResult {
557            stdout: String::from_utf8_lossy(&stdout).to_string(),
558            stderr: String::from_utf8_lossy(&stderr).to_string(),
559            exit_code,
560        })
561    }
562
563    async fn spawn_to_file(
564        &self,
565        _command: String,
566        _args: Vec<String>,
567        _cwd: Option<String>,
568        _stdout_to: std::path::PathBuf,
569    ) -> Result<SpawnResult, SpawnError> {
570        Err(SpawnError::Process(
571            "stdoutTo is not supported for remote processes".to_string(),
572        ))
573    }
574}
575
576/// A long-lived child process with piped stdio streams.
577///
578/// Wraps [`tokio::process::Child`] so the LSP code (and future callers
579/// like plugin-managed tool agents) doesn't reach into concrete process
580/// types — that way a container authority can transparently run the
581/// child through `docker exec -i` while the caller keeps talking to an
582/// ordinary stdin/stdout pair.
583///
584/// Streams are `Option`-wrapped so callers can [`Self::take_stdin`] /
585/// [`Self::take_stdout`] / [`Self::take_stderr`] into their own reader
586/// and writer tasks. After all streams are taken, the `StdioChild` is
587/// still useful for lifecycle control via [`Self::kill`] and
588/// [`Self::wait`].
589///
590/// `spawned_locally` tells callers whether `id()` names the real child
591/// process (true for local spawns) or an intermediate like `docker` /
592/// `ssh` (false). LSP's cgroup-attachment step keys off this — applying
593/// a cgroup to the `docker` CLI PID doesn't constrain the container-
594/// side server it exec'd.
595pub struct StdioChild {
596    inner: tokio::process::Child,
597    stdin: Option<ChildStdin>,
598    stdout: Option<ChildStdout>,
599    stderr: Option<ChildStderr>,
600    spawned_locally: bool,
601}
602
603impl StdioChild {
604    /// Construct a `StdioChild` from an already-spawned
605    /// `tokio::process::Child`. Pulls the piped streams out of the
606    /// child so callers can take them individually later.
607    ///
608    /// This constructor is for spawners that don't participate in
609    /// host-side resource limiting (the Docker variant is the
610    /// canonical example). Local spawners should prefer
611    /// [`Self::from_local_tokio_child`] so a `PostSpawnAction` produced
612    /// by [`ProcessLimits::apply_to_command`] is applied to the child's
613    /// PID before the spawner returns.
614    pub fn from_tokio_child(mut child: tokio::process::Child, spawned_locally: bool) -> Self {
615        let stdin = child.stdin.take();
616        let stdout = child.stdout.take();
617        let stderr = child.stderr.take();
618        Self {
619            inner: child,
620            stdin,
621            stdout,
622            stderr,
623            spawned_locally,
624        }
625    }
626
627    /// Construct a `StdioChild` for a locally-spawned child while
628    /// applying any host-side `PostSpawnAction` (cgroup attachment)
629    /// returned by [`ProcessLimits::apply_to_command`]. Best-effort:
630    /// failure to attach logs a warning but doesn't fail the spawn,
631    /// matching the pre-refactor behavior.
632    pub fn from_local_tokio_child(
633        child: tokio::process::Child,
634        post_spawn: PostSpawnAction,
635    ) -> Self {
636        let out = Self::from_tokio_child(child, true);
637        if let Some(pid) = out.inner.id() {
638            post_spawn.apply_to_child(pid);
639        }
640        out
641    }
642
643    /// Take the stdin stream. Returns `None` after the first call.
644    pub fn take_stdin(&mut self) -> Option<ChildStdin> {
645        self.stdin.take()
646    }
647
648    /// Take the stdout stream. Returns `None` after the first call.
649    pub fn take_stdout(&mut self) -> Option<ChildStdout> {
650        self.stdout.take()
651    }
652
653    /// Take the stderr stream. Returns `None` after the first call.
654    pub fn take_stderr(&mut self) -> Option<ChildStderr> {
655        self.stderr.take()
656    }
657
658    /// PID of the immediate child process. For local spawns this is
659    /// the LSP server itself; for docker/ssh this is the CLI wrapper.
660    /// Use [`Self::spawned_locally`] to tell which.
661    pub fn id(&self) -> Option<u32> {
662        self.inner.id()
663    }
664
665    /// `true` when the child PID names the real target process. Callers
666    /// that only apply host-side resource controls (cgroups, rlimits)
667    /// should skip their application when this is `false`.
668    pub fn spawned_locally(&self) -> bool {
669        self.spawned_locally
670    }
671
672    /// Request termination. Forwards to [`tokio::process::Child::kill`].
673    pub async fn kill(&mut self) -> std::io::Result<()> {
674        self.inner.kill().await
675    }
676
677    /// Await exit. Forwards to [`tokio::process::Child::wait`].
678    pub async fn wait(&mut self) -> std::io::Result<ExitStatus> {
679        self.inner.wait().await
680    }
681}
682
683/// Spawner for long-lived stdio processes (LSP servers, tool agents).
684///
685/// Separate from [`ProcessSpawner`] because the APIs diverge in two
686/// ways that don't compose: [`ProcessSpawner::spawn`] awaits
687/// completion and returns collected output; callers of
688/// `LongRunningSpawner` need a live child they can read from and
689/// write to over time.
690///
691/// Authorities expose one of these alongside their filesystem and
692/// one-shot spawner. Routing LSP spawning through it is what gives
693/// container authorities in-container LSP without a special-cased
694/// branch in `LspHandle`.
695///
696/// Callers pass an optional [`ProcessLimits`] block so local spawners
697/// can honor host-side memory / CPU limits. Non-local variants (docker,
698/// ssh) don't have a meaningful way to impose host limits on their
699/// child — cgroups attached to the `docker` CLI PID don't reach into
700/// the container — and are expected to ignore them.
701#[async_trait::async_trait]
702pub trait LongRunningSpawner: Send + Sync {
703    /// Spawn `command` with `args` as a long-lived stdio child under
704    /// this authority. Stdin/stdout/stderr are piped so the caller can
705    /// hand them to dedicated reader/writer tasks. `limits`, when
706    /// provided, lets local spawners attach cgroups or `setrlimit`;
707    /// remote spawners are expected to ignore it (see trait docs).
708    async fn spawn_stdio(
709        &self,
710        command: &str,
711        args: &[String],
712        env: Vec<(String, String)>,
713        cwd: Option<&Path>,
714        limits: Option<&ProcessLimits>,
715    ) -> Result<StdioChild, SpawnError>;
716
717    /// Check whether `command` resolves to an executable under this
718    /// authority. Routed through the same spawner so an SSH authority
719    /// probes the remote `$PATH` and a container authority probes the
720    /// container's `$PATH` — unlike `which::which` which only ever sees
721    /// the host.
722    async fn command_exists(&self, command: &str) -> bool;
723}
724
725/// Local long-running spawner using `tokio::process::Command` directly.
726///
727/// Functionally equivalent to how `LspHandle::spawn` works today, but
728/// exposed through the trait so non-local authorities can substitute
729/// their own implementation without any LSP-side awareness. Applies
730/// any `ProcessLimits` passed in via the same machinery the
731/// pre-refactor LSP code used (`apply_to_command` + `apply_to_child`).
732pub struct LocalLongRunningSpawner {
733    env: Arc<EnvProvider>,
734    trust: Arc<WorkspaceTrust>,
735}
736
737impl LocalLongRunningSpawner {
738    /// Local long-running spawner gated by `trust`, applying the live `env`
739    /// provider's captured environment under each child's environment.
740    pub fn new(env: Arc<EnvProvider>, trust: Arc<WorkspaceTrust>) -> Self {
741        Self { env, trust }
742    }
743}
744
745#[async_trait::async_trait]
746impl LongRunningSpawner for LocalLongRunningSpawner {
747    async fn spawn_stdio(
748        &self,
749        command: &str,
750        args: &[String],
751        env: Vec<(String, String)>,
752        cwd: Option<&Path>,
753        limits: Option<&ProcessLimits>,
754    ) -> Result<StdioChild, SpawnError> {
755        gate(
756            &self.trust,
757            command,
758            cwd.map(|p| p.to_string_lossy()).as_deref(),
759        )?;
760        let captured = local_captured_env(&self.env).await;
761        let mut cmd = tokio::process::Command::new(resolve_program(command).as_ref());
762        cmd.args(args)
763            // Provider env first, then the per-call env so the caller wins.
764            .envs(captured.iter().map(|(k, v)| (k.as_str(), v.as_str())))
765            .envs(env)
766            .stdin(std::process::Stdio::piped())
767            .stdout(std::process::Stdio::piped())
768            .stderr(std::process::Stdio::piped())
769            .hide_window()
770            .kill_on_drop(true);
771        if let Some(dir) = cwd {
772            cmd.current_dir(dir);
773        }
774
775        // Apply pre-spawn hooks (cgroup path selection, setrlimit
776        // via `pre_exec`). Errors bubble up so callers see
777        // configuration problems early — matches the pre-refactor
778        // LSP behavior.
779        let post_spawn = match limits {
780            Some(lim) => lim
781                .apply_to_command(&mut cmd)
782                .map_err(|e| SpawnError::Process(format!("Failed to apply process limits: {e}")))?,
783            None => PostSpawnAction::default(),
784        };
785
786        let child = cmd
787            .spawn()
788            .map_err(|e| SpawnError::Process(e.to_string()))?;
789        Ok(StdioChild::from_local_tokio_child(child, post_spawn))
790    }
791
792    async fn command_exists(&self, command: &str) -> bool {
793        // Honor the active env's PATH (e.g. a venv's `bin/`) so the existence
794        // probe searches the same place `spawn_stdio` will — otherwise a
795        // repo-local `pyright`/`ruff` looks missing and the server never
796        // starts. Falls back to the process PATH when no env is active.
797        let captured = local_captured_env(&self.env).await;
798        if let Some((_, path)) = captured.iter().find(|(k, _)| k == "PATH") {
799            let cwd = std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."));
800            return which::which_in(command, Some(path), &cwd).is_ok();
801        }
802        which::which(command).is_ok()
803    }
804}
805
806/// POSIX shell single-quote: wrap in `'…'`, escaping embedded quotes as
807/// `'\''`. Safe to splice into a remote command string.
808fn shell_quote(s: &str) -> String {
809    let mut out = String::with_capacity(s.len() + 2);
810    out.push('\'');
811    for c in s.chars() {
812        if c == '\'' {
813            out.push_str("'\\''");
814        } else {
815            out.push(c);
816        }
817    }
818    out.push('\'');
819    out
820}
821
822/// Build the remote shell command for a long-running process:
823/// `[cd <cwd> && ]exec env K=V… <command> <args…>` (all shell-quoted).
824/// `exec` replaces the login shell so the server *is* the SSH channel's
825/// process — EOF/kill propagate to it directly. `env` applies the injected
826/// environment then execs the real binary.
827fn build_remote_exec(
828    env: &[(String, String)],
829    cwd: Option<&str>,
830    command: &str,
831    args: &[String],
832) -> String {
833    let mut s = String::new();
834    if let Some(dir) = cwd {
835        s.push_str("cd ");
836        s.push_str(&shell_quote(dir));
837        s.push_str(" && ");
838    }
839    s.push_str("exec ");
840    if !env.is_empty() {
841        s.push_str("env ");
842        for (k, v) in env {
843            s.push_str(k);
844            s.push('=');
845            s.push_str(&shell_quote(v));
846            s.push(' ');
847        }
848    }
849    s.push_str(&shell_quote(command));
850    for a in args {
851        s.push(' ');
852        s.push_str(&shell_quote(a));
853    }
854    s
855}
856
857/// Build the remote command for an existence probe. `command -v` is a shell
858/// builtin (not a binary), so the env is applied via `export` assignments
859/// rather than the `env` binary.
860fn build_remote_command_exists(env: &[(String, String)], command: &str) -> String {
861    let mut s = String::new();
862    for (k, v) in env {
863        s.push_str("export ");
864        s.push_str(k);
865        s.push('=');
866        s.push_str(&shell_quote(v));
867        s.push_str("; ");
868    }
869    s.push_str("command -v ");
870    s.push_str(&shell_quote(command));
871    s.push_str(" >/dev/null 2>&1");
872    s
873}
874
875/// Assemble the `ssh` argv: connection options, `user@host`, then the single
876/// remote command string (ssh concatenates trailing args with spaces, so the
877/// command must already be one shell-quoted string). Mirrors the options the
878/// agent connection uses.
879fn build_ssh_args(
880    params: &crate::services::remote::ConnectionParams,
881    remote_cmd: &str,
882) -> Vec<String> {
883    let mut a = vec![
884        "-o".to_string(),
885        "StrictHostKeyChecking=accept-new".to_string(),
886        "-o".to_string(),
887        "BatchMode=yes".to_string(),
888    ];
889    if let Some(port) = params.port {
890        a.push("-p".to_string());
891        a.push(port.to_string());
892    }
893    if let Some(ref identity) = params.identity_file {
894        a.push("-i".to_string());
895        a.push(identity.to_string_lossy().into_owned());
896    }
897    a.extend(params.extra_args.iter().cloned());
898    a.push(params.ssh_target());
899    a.push(remote_cmd.to_string());
900    a
901}
902
903/// Assemble the `ssh` argv for an *interactive terminal* under a remote
904/// authority. Unlike [`build_ssh_args`] (one-shot, non-interactive LSP /
905/// probe spawns) this:
906///
907/// * forces remote PTY allocation with `-t` so the remote shell behaves
908///   interactively (job control, line editing, a real prompt);
909/// * omits `BatchMode=yes` so auth prompts (key passphrase, password,
910///   2FA) can surface *inside* the embedded terminal rather than failing;
911/// * runs an interactive login shell (`exec ${SHELL:-/bin/sh} -l`) after
912///   `cd`-ing into the workspace, so the user lands where the editor is
913///   rooted with their normal remote environment.
914///
915/// Returned as the argv *after* the leading `ssh` program name; the caller
916/// (the SSH terminal wrapper) sets `command = "ssh"` and these as `args`.
917pub fn build_ssh_terminal_args(
918    params: &crate::services::remote::ConnectionParams,
919    remote_dir: Option<&str>,
920) -> Vec<String> {
921    build_ssh_remote_args(params, remote_dir, SSH_EXEC_LOGIN_SHELL)
922}
923
924/// Build the `ssh` argv that runs an interactive *agent* `argv` on the remote
925/// host, rooted at the workspace dir — the agent analogue of
926/// [`build_ssh_terminal_args`]. `ssh` has no cwd flag, so cwd is pinned through
927/// the same `cd <dir>` shell hop the bare terminal uses; the agent argv is then
928/// handed to a remote **login** shell (`exec $SHELL -lc 'exec <argv>'`) so its
929/// profile-derived `PATH` resolves the agent binary exactly as the bare
930/// terminal would. The argv is POSIX-quoted, so paths/args with spaces survive
931/// the remote shell's re-parse intact. Without this an agent command under an
932/// SSH session ran on the **local** host instead of the remote.
933pub fn build_ssh_agent_terminal_args(
934    params: &crate::services::remote::ConnectionParams,
935    remote_dir: Option<&str>,
936    argv: &[String],
937) -> Vec<String> {
938    build_ssh_remote_args(params, remote_dir, &agent_login_exec_tail(argv))
939}
940
941/// Shared body of the SSH integrated-terminal argv: the `ssh` flags + target,
942/// then a remote command that lands in the workspace and runs `exec_tail`. The
943/// bare terminal passes [`SSH_EXEC_LOGIN_SHELL`]; the agent terminal passes
944/// [`agent_login_exec_tail`]. Kept as one function so the cwd-landing logic
945/// (and the `-t` / StrictHostKeyChecking / `-p` / `-i` / extra-args assembly)
946/// stays identical between the two.
947fn build_ssh_remote_args(
948    params: &crate::services::remote::ConnectionParams,
949    remote_dir: Option<&str>,
950    exec_tail: &str,
951) -> Vec<String> {
952    let mut a = vec![
953        "-t".to_string(),
954        "-o".to_string(),
955        "StrictHostKeyChecking=accept-new".to_string(),
956    ];
957    if let Some(port) = params.port {
958        a.push("-p".to_string());
959        a.push(port.to_string());
960    }
961    if let Some(ref identity) = params.identity_file {
962        a.push("-i".to_string());
963        a.push(identity.to_string_lossy().into_owned());
964    }
965    a.extend(params.extra_args.iter().cloned());
966    a.push(params.ssh_target());
967
968    // Land in the workspace (when known), then run `exec_tail`. `remote_dir`
969    // is whatever path the URL pointed at, which may be a *file* (`fresh
970    // ssh://host/proj/main.rs`) — so fall back to its parent dir, and treat a
971    // failed `cd` as non-fatal so the command always starts. `exec` replaces
972    // the ssh-side shell so closing the terminal tears the session down
973    // cleanly.
974    let mut remote_cmd = String::new();
975    if let Some(dir) = remote_dir.filter(|d| !d.is_empty()) {
976        let quoted = shell_quote(dir);
977        remote_cmd.push_str(&format!(
978            "d={quoted}; [ -d \"$d\" ] || d=$(dirname \"$d\"); cd \"$d\" 2>/dev/null; "
979        ));
980    }
981    remote_cmd.push_str(exec_tail);
982    a.push(remote_cmd);
983    a
984}
985
986/// The `exec ${SHELL:-/bin/sh} -lc '<exec argv…>'` tail shared by the SSH and
987/// K8s **agent** terminals: hand the agent argv to a remote **login** shell (so
988/// its profile-derived `PATH` resolves the agent binary), which `exec`s it as
989/// the session leader (so closing the terminal tears it down). Each argv token
990/// is POSIX-quoted, then the whole `exec …` string is quoted again as the
991/// single `-c` argument, so the argv survives the remote shell's re-parse with
992/// spaces/metacharacters intact. `argv` is always non-empty here (the empty
993/// case falls back to the bare-shell wrapper before reaching this).
994fn agent_login_exec_tail(argv: &[String]) -> String {
995    let joined = argv
996        .iter()
997        .map(|a| shell_quote(a))
998        .collect::<Vec<_>>()
999        .join(" ");
1000    format!(
1001        "exec ${{SHELL:-/bin/sh}} -lc {}",
1002        shell_quote(&format!("exec {joined}"))
1003    )
1004}
1005
1006/// The tail of the SSH terminal's remote command: hand control to the user's
1007/// login shell. Factored into a constant so the activated-env path can find and
1008/// rewrite exactly this segment into a launcher (see
1009/// [`ssh_remote_env_launcher`]) without re-deriving the whole command.
1010pub const SSH_EXEC_LOGIN_SHELL: &str = "exec ${SHELL:-/bin/sh} -l";
1011
1012/// Build the replacement for [`SSH_EXEC_LOGIN_SHELL`] that applies the activated
1013/// environment (venv/direnv/mise) in the SSH-backed integrated terminal before
1014/// handing off to the user's login shell — the remote analogue of the local
1015/// terminal's `CommandBuilder.env` injection (issue #2355; see
1016/// `docs/internal/uniform-env-activation-design.md`).
1017///
1018/// The result is `exec python3 -c '<literal>'`. python3 is already required on
1019/// every SSH remote (it runs the agent), so this adds no dependency and needs
1020/// no agent-PTY work — the existing `ssh -t` keeps providing the PTY. The
1021/// `'<literal>'` is a single shell-literal token containing no single quotes, so
1022/// the user's *login* shell (which `ssh` uses to parse the command — possibly
1023/// fish) passes it through verbatim regardless of its quoting rules. The literal
1024/// base64-decodes and `exec`s a python launcher that, on the remote: captures
1025/// the activation **delta** (run the recipe in `bash`, diff against a clean
1026/// login env), applies it to `os.environ` as data — never re-parsed by the
1027/// user's interactive shell — then `exec`s `$SHELL -l`. Capturing on the remote
1028/// keeps everything one round-trip and always fresh; the recipe is embedded as
1029/// a JSON string literal (safe for any byte content).
1030pub fn ssh_remote_env_launcher(recipe: &str) -> String {
1031    use base64::{engine::general_purpose::STANDARD as BASE64, Engine};
1032
1033    let recipe_json = serde_json::to_string(recipe).unwrap_or_else(|_| "\"\"".to_string());
1034    // NB: the Rust raw string is the *python source*. `\\n` here is two chars
1035    // (backslash, n) in the source, which python parses to a single `\n` for
1036    // `printf` to turn into a newline. `{{` / `}}` are literal braces.
1037    let launcher_src = format!(
1038        r#"import os,subprocess
1039_r={recipe_json}
1040_S="{sentinel}"
1041_script="command env; printf '%s\\n' '"+_S+"'; "+_r+"; command env"
1042try:
1043    _o=subprocess.run(["bash","-lc",_script],stdout=subprocess.PIPE,stderr=subprocess.DEVNULL).stdout.decode("utf-8","replace")
1044except Exception:
1045    _o=""
1046def _p(t):
1047    d={{}}
1048    for ln in t.splitlines():
1049        i=ln.find("=")
1050        if i>0: d[ln[:i]]=ln[i+1:]
1051    return d
1052if _S in _o:
1053    _b,_a=_o.split(_S,1)
1054    _bb=_p(_b); _aa=_p(_a)
1055    for k,v in _aa.items():
1056        if _bb.get(k)!=v: os.environ[k]=v
1057    for k in list(_bb):
1058        if k not in _aa: os.environ.pop(k,None)
1059_sh=os.environ.get("SHELL") or "/bin/sh"
1060os.execvp(_sh,[_sh,"-l"])
1061"#,
1062        sentinel = crate::services::env_provider::DELTA_SENTINEL,
1063    );
1064
1065    let b64 = BASE64.encode(launcher_src.as_bytes());
1066    format!("exec python3 -c 'import base64;exec(base64.b64decode(\"{b64}\").decode())'")
1067}
1068
1069/// Build the `kubectl` argv that opens the integrated terminal as an
1070/// interactive login shell *inside the pod*:
1071///
1072/// ```text
1073/// [--context CTX] exec -it -n NS [-c C] POD -- sh -lc 'cd WS; exec "$SHELL" -l'
1074/// ```
1075///
1076/// The K8s analogue of [`build_ssh_terminal_args`]. `-it` allocates a TTY (so
1077/// resize / curses apps work — `kubectl` itself implements the resize
1078/// protocol), and the `sh -lc` wrapper pins cwd, so the authority's terminal
1079/// wrapper sets `manages_cwd = true`. A failed `cd` is non-fatal so the shell
1080/// always starts; `exec` replaces the wrapper shell so closing the terminal
1081/// tears the exec session down cleanly.
1082pub fn build_kube_terminal_args(
1083    target: &crate::services::remote::KubeTarget,
1084    base_env: &[(String, String)],
1085) -> Vec<String> {
1086    build_kube_remote_args(target, base_env, "exec ${SHELL:-/bin/sh} -l")
1087}
1088
1089/// Build the `kubectl` argv that runs an interactive *agent* `argv` inside the
1090/// pod, rooted at the workspace dir — the agent analogue of
1091/// [`build_kube_terminal_args`]. `kubectl exec` has no cwd flag, so cwd is
1092/// pinned through the same `cd <ws>` shell hop the bare terminal uses, and the
1093/// agent argv is handed to a pod-side **login** shell (`exec $SHELL -lc 'exec
1094/// <argv>'`) so its `PATH` resolves the agent binary. Without this an agent
1095/// command under a K8s session ran on the **local** host instead of the pod.
1096pub fn build_kube_agent_terminal_args(
1097    target: &crate::services::remote::KubeTarget,
1098    base_env: &[(String, String)],
1099    argv: &[String],
1100) -> Vec<String> {
1101    build_kube_remote_args(target, base_env, &agent_login_exec_tail(argv))
1102}
1103
1104/// Shared body of the K8s integrated-terminal argv: export the in-pod env
1105/// probe, land in the workspace, then run `exec_tail` inside a `sh -lc`
1106/// wrapper. The bare terminal passes `exec ${SHELL:-/bin/sh} -l`; the agent
1107/// terminal passes [`agent_login_exec_tail`]. Kept as one function so the
1108/// env-export + cwd-landing logic stays identical between the two.
1109fn build_kube_remote_args(
1110    target: &crate::services::remote::KubeTarget,
1111    base_env: &[(String, String)],
1112    exec_tail: &str,
1113) -> Vec<String> {
1114    let mut remote_cmd = String::new();
1115    // Apply the captured in-pod env probe to the integrated terminal so it
1116    // matches what LSP / spawnProcess get in the pod (issue #2355; see
1117    // docs/internal/uniform-env-activation-design.md). `kubectl exec` has no
1118    // `-e` flag, so — like the documented `kubectl exec -- env …` / `sh -c
1119    // 'export …'` workarounds — we `export` each pair inside the `sh -lc`
1120    // wrapper we already control. Values are POSIX-quoted (the parser is the
1121    // `sh` we spawn, not the user's interactive shell), so this is data, not
1122    // shell-injected. Exports come first so the subsequent `cd`/login shell
1123    // inherit them.
1124    for (k, v) in base_env {
1125        remote_cmd.push_str(&format!("export {}={}; ", k, shell_quote(v)));
1126    }
1127    if let Some(dir) = target.workspace.as_deref().filter(|d| !d.is_empty()) {
1128        let quoted = shell_quote(dir);
1129        remote_cmd.push_str(&format!(
1130            "d={quoted}; [ -d \"$d\" ] || d=$(dirname \"$d\"); cd \"$d\" 2>/dev/null; "
1131        ));
1132    }
1133    remote_cmd.push_str(exec_tail);
1134    crate::services::remote::transport::kubectl_exec_argv(
1135        target,
1136        &["-it"],
1137        "sh",
1138        &["-lc".to_string(), remote_cmd],
1139    )
1140}
1141
1142/// Long-running spawner over SSH: each LSP server (or tool agent) gets its own
1143/// `ssh user@host <remote-cmd>` subprocess, whose piped stdio *is* the remote
1144/// process's stdio. Returning a real local [`tokio::process::Child`] (the ssh
1145/// client) means the LSP I/O layer talks to ordinary `ChildStdin`/`ChildStdout`
1146/// with no awareness it's remote — the same trick the Docker spawner uses with
1147/// the local `docker` CLI.
1148///
1149/// This opens a separate SSH connection per server rather than multiplexing
1150/// through the agent: the agent's one-shot `exec` can't keep a process alive
1151/// with writable stdin, and abstracting `StdioChild` / the whole LSP I/O layer
1152/// over the agent channel would be a far larger change. The tradeoff is extra
1153/// SSH connections; the win is LSP that actually runs on the remote host
1154/// instead of the host-local fallback.
1155pub struct RemoteLongRunningSpawner {
1156    params: crate::services::remote::ConnectionParams,
1157    env: Arc<EnvProvider>,
1158    trust: Arc<WorkspaceTrust>,
1159}
1160
1161impl RemoteLongRunningSpawner {
1162    /// Spawner for `params`, gated by `trust`, applying the live `env` provider
1163    /// (captured on the remote host) to every server it launches.
1164    pub fn new(
1165        params: crate::services::remote::ConnectionParams,
1166        env: Arc<EnvProvider>,
1167        trust: Arc<WorkspaceTrust>,
1168    ) -> Self {
1169        Self { params, env, trust }
1170    }
1171
1172    /// Capture the active env on the *remote* host: run the provider's script
1173    /// through a one-shot `ssh … <script>` (raw — no env applied).
1174    async fn captured_env(&self) -> Vec<(String, String)> {
1175        let params = self.params.clone();
1176        self.env
1177            .current(move |script| async move {
1178                let ssh_args = build_ssh_args(&params, &script);
1179                let output = tokio::process::Command::new("ssh")
1180                    .args(&ssh_args)
1181                    .hide_window()
1182                    .output()
1183                    .await
1184                    .ok()?;
1185                Some(String::from_utf8_lossy(&output.stdout).into_owned())
1186            })
1187            .await
1188    }
1189}
1190
1191#[async_trait::async_trait]
1192impl LongRunningSpawner for RemoteLongRunningSpawner {
1193    async fn spawn_stdio(
1194        &self,
1195        command: &str,
1196        args: &[String],
1197        env: Vec<(String, String)>,
1198        cwd: Option<&Path>,
1199        _limits: Option<&ProcessLimits>,
1200    ) -> Result<StdioChild, SpawnError> {
1201        // Host-side process limits don't reach a remote process (the local
1202        // PID is the ssh client), so `_limits` is ignored — same as Docker.
1203        let cwd_str = cwd.map(|p| p.to_string_lossy().into_owned());
1204        gate(&self.trust, command, cwd_str.as_deref())?;
1205
1206        // Captured (provider) env first, then the per-call env so the caller
1207        // wins on conflict (mirrors the local layering).
1208        let mut merged = self.captured_env().await;
1209        merged.extend(env);
1210
1211        let remote = build_remote_exec(&merged, cwd_str.as_deref(), command, args);
1212        let ssh_args = build_ssh_args(&self.params, &remote);
1213
1214        let mut cmd = tokio::process::Command::new("ssh");
1215        cmd.args(&ssh_args)
1216            .stdin(std::process::Stdio::piped())
1217            .stdout(std::process::Stdio::piped())
1218            .stderr(std::process::Stdio::piped())
1219            .hide_window()
1220            .kill_on_drop(true);
1221
1222        let child = cmd
1223            .spawn()
1224            .map_err(|e| SpawnError::Process(e.to_string()))?;
1225        // `spawned_locally = false`: the local PID is the ssh client, not the
1226        // remote server, so host-only resource controls skip themselves.
1227        Ok(StdioChild::from_tokio_child(child, false))
1228    }
1229
1230    async fn command_exists(&self, command: &str) -> bool {
1231        let captured = self.captured_env().await;
1232        let remote = build_remote_command_exists(&captured, command);
1233        let ssh_args = build_ssh_args(&self.params, &remote);
1234        match tokio::process::Command::new("ssh")
1235            .args(&ssh_args)
1236            .hide_window()
1237            .output()
1238            .await
1239        {
1240            Ok(output) => output.status.success(),
1241            Err(_) => false,
1242        }
1243    }
1244}
1245
1246#[cfg(test)]
1247mod tests {
1248    use super::*;
1249    use tokio::io::AsyncReadExt;
1250
1251    // On non-Windows, `resolve_program` is a pure pass-through: PATH lookup
1252    // at spawn time already does the right thing, so the command string must
1253    // be returned untouched (no surprise absolute-path rewrites).
1254    #[cfg(not(windows))]
1255    #[test]
1256    fn resolve_program_is_passthrough_on_unix() {
1257        assert_eq!(
1258            resolve_program("typescript-language-server"),
1259            "typescript-language-server"
1260        );
1261        assert_eq!(resolve_program("sh"), "sh");
1262        assert_eq!(resolve_program(""), "");
1263    }
1264
1265    // On Windows, an unresolvable name must fall back to itself so the spawn
1266    // still surfaces a meaningful "not found" error rather than a panic, while
1267    // a real binary resolves to a full, spawnable path.
1268    #[cfg(windows)]
1269    #[test]
1270    fn resolve_program_falls_back_and_resolves_on_windows() {
1271        assert_eq!(
1272            resolve_program("fresh-unlikely-binary-name-ygzu9"),
1273            "fresh-unlikely-binary-name-ygzu9"
1274        );
1275        // `cmd` is always present on Windows and resolves to an absolute path.
1276        let resolved = resolve_program("cmd");
1277        assert!(
1278            std::path::Path::new(resolved.as_ref()).is_absolute(),
1279            "expected an absolute path, got {resolved:?}"
1280        );
1281    }
1282
1283    #[tokio::test]
1284    async fn test_local_spawner() {
1285        let spawner = LocalProcessSpawner::new(
1286            Arc::new(EnvProvider::inactive()),
1287            Arc::new(WorkspaceTrust::permissive()),
1288        );
1289        let result = spawner
1290            .spawn("echo".to_string(), vec!["hello".to_string()], None)
1291            .await
1292            .unwrap();
1293
1294        assert_eq!(result.exit_code, 0);
1295        assert!(result.stdout.trim() == "hello");
1296    }
1297
1298    #[tokio::test]
1299    async fn test_local_spawner_stdout_to_file() {
1300        let spawner = LocalProcessSpawner::new(
1301            Arc::new(EnvProvider::inactive()),
1302            Arc::new(WorkspaceTrust::permissive()),
1303        );
1304        let tmp =
1305            std::env::temp_dir().join(format!("fresh-spawner-test-{}.out", std::process::id()));
1306        // Best-effort cleanup of any leftover from a previous run.
1307        // Failure (e.g. NotFound) is fine — the spawn below will
1308        // create the file fresh.
1309        #[allow(clippy::let_underscore_must_use)]
1310        let _ = std::fs::remove_file(&tmp);
1311        let result = spawner
1312            .spawn_to_file(
1313                "echo".to_string(),
1314                vec!["hello-from-disk".to_string()],
1315                None,
1316                tmp.clone(),
1317            )
1318            .await
1319            .unwrap();
1320
1321        assert_eq!(result.exit_code, 0);
1322        assert!(
1323            result.stdout.is_empty(),
1324            "stdout should be empty when streaming"
1325        );
1326        let contents = std::fs::read_to_string(&tmp).expect("output file should exist");
1327        assert_eq!(contents.trim(), "hello-from-disk");
1328        // Best-effort cleanup — leaving a temp file behind on
1329        // failure is acceptable and the next run's pre-cleanup
1330        // handles it.
1331        #[allow(clippy::let_underscore_must_use)]
1332        let _ = std::fs::remove_file(&tmp);
1333    }
1334
1335    #[tokio::test]
1336    async fn test_local_spawner_cancellable_kill() {
1337        let spawner = LocalProcessSpawner::new(
1338            Arc::new(EnvProvider::inactive()),
1339            Arc::new(WorkspaceTrust::permissive()),
1340        );
1341        let (kill_tx, kill_rx) = tokio::sync::oneshot::channel::<()>();
1342
1343        // Start a sleep that would take 30s normally; fire kill after 100ms.
1344        let task = tokio::spawn(async move {
1345            spawner
1346                .spawn_cancellable(
1347                    "sleep".to_string(),
1348                    vec!["30".to_string()],
1349                    None,
1350                    None,
1351                    kill_rx,
1352                )
1353                .await
1354        });
1355
1356        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1357        // Fire the kill. Err means the receiver was dropped (task
1358        // already finished), which would mean the 30s sleep returned
1359        // promptly on its own — impossible in this test window, but
1360        // not worth a panic either way; the subsequent task.await
1361        // surfaces any real problem.
1362        #[allow(clippy::let_underscore_must_use)]
1363        let _ = kill_tx.send(());
1364
1365        let start = std::time::Instant::now();
1366        let result = task.await.unwrap().unwrap();
1367        let elapsed = start.elapsed();
1368
1369        // SIGKILL'd sleep on Unix returns exit_code 137 or -1 (no code).
1370        // The point is we returned promptly, not after 30s.
1371        assert!(
1372            elapsed < std::time::Duration::from_secs(5),
1373            "kill should be prompt, took {:?}",
1374            elapsed
1375        );
1376        assert_ne!(result.exit_code, 0, "killed process shouldn't be exit 0");
1377    }
1378
1379    #[tokio::test]
1380    async fn local_long_running_spawn_stdio_pipes_output() {
1381        let spawner = LocalLongRunningSpawner::new(
1382            Arc::new(EnvProvider::inactive()),
1383            Arc::new(WorkspaceTrust::permissive()),
1384        );
1385        let mut child = spawner
1386            .spawn_stdio(
1387                "sh",
1388                &["-c".into(), "echo hi".into()],
1389                Vec::new(),
1390                None,
1391                None,
1392            )
1393            .await
1394            .expect("spawn succeeds");
1395
1396        let mut stdout = child.take_stdout().expect("stdout piped");
1397        let mut buf = String::new();
1398        stdout.read_to_string(&mut buf).await.unwrap();
1399        assert_eq!(buf.trim(), "hi");
1400
1401        let status = child.wait().await.unwrap();
1402        assert!(status.success());
1403        assert!(child.spawned_locally());
1404    }
1405
1406    #[tokio::test]
1407    async fn local_long_running_command_exists_for_sh() {
1408        let spawner = LocalLongRunningSpawner::new(
1409            Arc::new(EnvProvider::inactive()),
1410            Arc::new(WorkspaceTrust::permissive()),
1411        );
1412        assert!(spawner.command_exists("sh").await);
1413        assert!(
1414            !spawner
1415                .command_exists("fresh-unlikely-binary-name-ygzu9")
1416                .await
1417        );
1418    }
1419
1420    // Unix-only: the local env capture runs the recipe through a POSIX login
1421    // shell (`$SHELL -lc`, falling back to `/bin/sh`). On Windows there is no
1422    // such shell, so capture intentionally no-ops — there's nothing to assert.
1423    #[cfg(unix)]
1424    #[tokio::test]
1425    async fn local_spawner_applies_active_env_provider() {
1426        // Full local path: an active provider whose snippet exports a var →
1427        // captured via the login shell → injected into the spawned child.
1428        let env = Arc::new(EnvProvider::inactive());
1429        env.set("export FRESH_ENV_TEST=hi-from-provider".into(), None);
1430        let spawner = LocalProcessSpawner::new(env, Arc::new(WorkspaceTrust::permissive()));
1431        let result = spawner
1432            .spawn(
1433                "sh".into(),
1434                vec!["-c".into(), "printf %s \"$FRESH_ENV_TEST\"".into()],
1435                None,
1436            )
1437            .await
1438            .unwrap();
1439        assert_eq!(result.exit_code, 0);
1440        assert_eq!(result.stdout, "hi-from-provider");
1441    }
1442
1443    #[tokio::test]
1444    async fn local_spawner_inactive_provider_injects_nothing() {
1445        let spawner = LocalProcessSpawner::new(
1446            Arc::new(EnvProvider::inactive()),
1447            Arc::new(WorkspaceTrust::permissive()),
1448        );
1449        let result = spawner
1450            .spawn(
1451                "sh".into(),
1452                vec!["-c".into(), "printf %s \"${FRESH_ENV_TEST:-unset}\"".into()],
1453                None,
1454            )
1455            .await
1456            .unwrap();
1457        assert_eq!(result.stdout, "unset");
1458    }
1459
1460    // --- RemoteLongRunningSpawner command builders (pure, no SSH needed) ---
1461
1462    #[test]
1463    fn shell_quote_wraps_and_escapes() {
1464        assert_eq!(shell_quote("abc"), "'abc'");
1465        assert_eq!(shell_quote("a b/c"), "'a b/c'");
1466        assert_eq!(shell_quote("a'b"), "'a'\\''b'");
1467    }
1468
1469    #[test]
1470    fn build_remote_exec_with_cwd_and_env() {
1471        let env = vec![("VIRTUAL_ENV".to_string(), "/proj/.venv".to_string())];
1472        let s = build_remote_exec(&env, Some("/proj dir"), "python", &["x.py".to_string()]);
1473        assert_eq!(
1474            s,
1475            "cd '/proj dir' && exec env VIRTUAL_ENV='/proj/.venv' 'python' 'x.py'"
1476        );
1477    }
1478
1479    #[test]
1480    fn build_remote_exec_minimal() {
1481        assert_eq!(build_remote_exec(&[], None, "gopls", &[]), "exec 'gopls'");
1482    }
1483
1484    #[test]
1485    fn build_remote_command_exists_exports_env() {
1486        let env = vec![("PATH".to_string(), "/proj/.venv/bin:/usr/bin".to_string())];
1487        assert_eq!(
1488            build_remote_command_exists(&env, "pyright"),
1489            "export PATH='/proj/.venv/bin:/usr/bin'; command -v 'pyright' >/dev/null 2>&1"
1490        );
1491    }
1492
1493    #[test]
1494    fn build_ssh_args_full() {
1495        let params = crate::services::remote::ConnectionParams {
1496            user: Some("u".into()),
1497            host: "h".into(),
1498            port: Some(2222),
1499            identity_file: Some(std::path::PathBuf::from("/k")),
1500            extra_args: Vec::new(),
1501        };
1502        let a = build_ssh_args(&params, "echo hi");
1503        let expected: Vec<String> = [
1504            "-o",
1505            "StrictHostKeyChecking=accept-new",
1506            "-o",
1507            "BatchMode=yes",
1508            "-p",
1509            "2222",
1510            "-i",
1511            "/k",
1512            "u@h",
1513            "echo hi",
1514        ]
1515        .into_iter()
1516        .map(String::from)
1517        .collect();
1518        assert_eq!(a, expected);
1519    }
1520
1521    #[test]
1522    fn build_ssh_args_omits_user_and_threads_extra_args() {
1523        // No user → bare host target; extra args land verbatim before it.
1524        let params = crate::services::remote::ConnectionParams {
1525            user: None,
1526            host: "h".into(),
1527            port: None,
1528            identity_file: None,
1529            extra_args: vec!["-J".into(), "jump".into()],
1530        };
1531        let a = build_ssh_args(&params, "echo hi");
1532        let expected: Vec<String> = [
1533            "-o",
1534            "StrictHostKeyChecking=accept-new",
1535            "-o",
1536            "BatchMode=yes",
1537            "-J",
1538            "jump",
1539            "h",
1540            "echo hi",
1541        ]
1542        .into_iter()
1543        .map(String::from)
1544        .collect();
1545        assert_eq!(a, expected);
1546    }
1547
1548    #[test]
1549    fn build_ssh_terminal_args_forces_tty_and_login_shell() {
1550        let params = crate::services::remote::ConnectionParams {
1551            user: Some("u".into()),
1552            host: "h".into(),
1553            port: Some(2222),
1554            identity_file: Some(std::path::PathBuf::from("/k")),
1555            extra_args: Vec::new(),
1556        };
1557        let a = build_ssh_terminal_args(&params, Some("/proj dir"));
1558        let expected: Vec<String> = [
1559            "-t",
1560            "-o",
1561            "StrictHostKeyChecking=accept-new",
1562            "-p",
1563            "2222",
1564            "-i",
1565            "/k",
1566            "u@h",
1567            "d='/proj dir'; [ -d \"$d\" ] || d=$(dirname \"$d\"); cd \"$d\" 2>/dev/null; exec ${SHELL:-/bin/sh} -l",
1568        ]
1569        .into_iter()
1570        .map(String::from)
1571        .collect();
1572        assert_eq!(a, expected);
1573        // No BatchMode — interactive auth must be able to prompt in the PTY.
1574        assert!(!a.iter().any(|s| s == "BatchMode=yes"));
1575        // The exec tail is exactly the shared constant the env path rewrites.
1576        assert!(a.last().unwrap().ends_with(SSH_EXEC_LOGIN_SHELL));
1577    }
1578
1579    #[test]
1580    fn ssh_remote_env_launcher_is_a_safe_single_quoted_python_oneliner() {
1581        use base64::{engine::general_purpose::STANDARD as BASE64, Engine};
1582
1583        let recipe = "eval \"$(direnv export bash)\"";
1584        let launcher = ssh_remote_env_launcher(recipe);
1585
1586        // Shape: replaces the login-shell exec with a python3 -c invocation.
1587        assert!(launcher.starts_with("exec python3 -c '"));
1588        assert!(launcher.ends_with('\''));
1589        // The python argument is a single shell-literal token: no *inner* single
1590        // quotes, so the user's login shell (bash/zsh/fish) passes it verbatim
1591        // regardless of its quoting rules.
1592        let inner = launcher
1593            .trim_start_matches("exec python3 -c '")
1594            .trim_end_matches('\'');
1595        assert!(
1596            !inner.contains('\''),
1597            "inner literal must not contain a single quote"
1598        );
1599
1600        // The base64 blob decodes to python that embeds the recipe and splits on
1601        // the same sentinel the local delta capture uses.
1602        let b64 = inner
1603            .trim_start_matches("import base64;exec(base64.b64decode(\"")
1604            .trim_end_matches("\").decode())");
1605        let src = String::from_utf8(BASE64.decode(b64).unwrap()).unwrap();
1606        assert!(
1607            src.contains("direnv export bash"),
1608            "recipe must be embedded"
1609        );
1610        assert!(src.contains(crate::services::env_provider::DELTA_SENTINEL));
1611        assert!(src.contains("os.execvp"));
1612    }
1613
1614    #[test]
1615    fn ssh_launcher_embeds_recipes_with_quotes_safely() {
1616        // A recipe containing single quotes must not break the outer literal.
1617        let recipe = "export X='a b'; source ./.venv/bin/activate";
1618        let launcher = ssh_remote_env_launcher(recipe);
1619        let inner = launcher
1620            .trim_start_matches("exec python3 -c '")
1621            .trim_end_matches('\'');
1622        assert!(
1623            !inner.contains('\''),
1624            "recipe quotes must be base64-encapsulated, never leak into the literal"
1625        );
1626    }
1627
1628    #[test]
1629    fn build_ssh_terminal_args_without_dir_skips_cd() {
1630        let params = crate::services::remote::ConnectionParams {
1631            user: Some("u".into()),
1632            host: "h".into(),
1633            port: None,
1634            identity_file: None,
1635            extra_args: Vec::new(),
1636        };
1637        let a = build_ssh_terminal_args(&params, None);
1638        assert_eq!(
1639            a,
1640            vec![
1641                "-t",
1642                "-o",
1643                "StrictHostKeyChecking=accept-new",
1644                "u@h",
1645                "exec ${SHELL:-/bin/sh} -l",
1646            ]
1647        );
1648        // Empty dir is treated the same as no dir.
1649        assert_eq!(build_ssh_terminal_args(&params, Some("")), a);
1650    }
1651
1652    #[test]
1653    fn build_ssh_agent_terminal_args_runs_agent_in_remote_workspace() {
1654        let params = crate::services::remote::ConnectionParams {
1655            user: Some("u".into()),
1656            host: "h".into(),
1657            port: Some(2222),
1658            identity_file: Some(std::path::PathBuf::from("/k")),
1659            extra_args: Vec::new(),
1660        };
1661        let argv = vec![
1662            "claude".to_string(),
1663            "--resume".to_string(),
1664            "u-1".to_string(),
1665        ];
1666        let a = build_ssh_agent_terminal_args(&params, Some("/srv/proj"), &argv);
1667
1668        // Same ssh head as the bare terminal: -t, StrictHostKeyChecking, port,
1669        // identity, then the target.
1670        assert_eq!(
1671            &a[..8],
1672            &[
1673                "-t",
1674                "-o",
1675                "StrictHostKeyChecking=accept-new",
1676                "-p",
1677                "2222",
1678                "-i",
1679                "/k",
1680                "u@h",
1681            ]
1682        );
1683        let remote_cmd = a.last().unwrap();
1684        // Lands in the workspace before exec'ing the agent.
1685        assert!(remote_cmd.contains("cd \"$d\"") && remote_cmd.contains("'/srv/proj'"));
1686        // Hands the agent to a remote *login* shell so its PATH resolves it,
1687        // then execs the (quoted) agent argv.
1688        assert!(remote_cmd.contains("exec ${SHELL:-/bin/sh} -lc "));
1689        assert!(
1690            remote_cmd.contains("claude")
1691                && remote_cmd.contains("--resume")
1692                && remote_cmd.contains("u-1")
1693        );
1694    }
1695
1696    #[test]
1697    fn build_ssh_agent_terminal_args_quotes_args_with_spaces() {
1698        // An argv token containing a space (or shell metacharacter) must be
1699        // POSIX-quoted so the remote shell parses it as a single argument
1700        // rather than splitting it.
1701        let params = crate::services::remote::ConnectionParams {
1702            user: None,
1703            host: "h".into(),
1704            port: None,
1705            identity_file: None,
1706            extra_args: Vec::new(),
1707        };
1708        let argv = vec!["agent".to_string(), "a b".to_string()];
1709        let remote_cmd = build_ssh_agent_terminal_args(&params, None, &argv)
1710            .pop()
1711            .unwrap();
1712        // The space-containing token appears single-quoted, never bare.
1713        assert!(remote_cmd.contains("'a b'"));
1714    }
1715
1716    #[test]
1717    fn build_kube_agent_terminal_args_runs_agent_in_pod_workspace() {
1718        let target = crate::services::remote::KubeTarget {
1719            context: None,
1720            namespace: "dev".into(),
1721            pod: "pod-1".into(),
1722            container: None,
1723            workspace: Some("/workspace".into()),
1724        };
1725        let argv = vec![
1726            "claude".to_string(),
1727            "--resume".to_string(),
1728            "u-1".to_string(),
1729        ];
1730        let a = build_kube_agent_terminal_args(&target, &[], &argv);
1731        // `kubectl exec -it … -- sh -lc '<remote_cmd>'`.
1732        assert_eq!(a[0], "exec");
1733        assert!(a.contains(&"-it".to_string()));
1734        assert!(a.contains(&"sh".to_string()) && a.contains(&"-lc".to_string()));
1735        let remote_cmd = a.last().unwrap();
1736        assert!(remote_cmd.contains("cd \"$d\"") && remote_cmd.contains("'/workspace'"));
1737        assert!(remote_cmd.contains("exec ${SHELL:-/bin/sh} -lc "));
1738        assert!(remote_cmd.contains("claude"));
1739    }
1740
1741    #[test]
1742    fn build_kube_terminal_args_allocates_tty_and_pins_cwd() {
1743        let target = crate::services::remote::KubeTarget {
1744            context: Some("prod".into()),
1745            namespace: "dev".into(),
1746            pod: "pod-1".into(),
1747            container: Some("app".into()),
1748            workspace: Some("/workspace".into()),
1749        };
1750        let a = build_kube_terminal_args(&target, &[]);
1751        let expected: Vec<String> = [
1752            "--context",
1753            "prod",
1754            "exec",
1755            "-it",
1756            "-n",
1757            "dev",
1758            "-c",
1759            "app",
1760            "pod-1",
1761            "--",
1762            "sh",
1763            "-lc",
1764            "d='/workspace'; [ -d \"$d\" ] || d=$(dirname \"$d\"); cd \"$d\" 2>/dev/null; exec ${SHELL:-/bin/sh} -l",
1765        ]
1766        .into_iter()
1767        .map(String::from)
1768        .collect();
1769        assert_eq!(a, expected);
1770    }
1771
1772    #[test]
1773    fn build_kube_terminal_args_exports_base_env_before_login_shell() {
1774        let target = crate::services::remote::KubeTarget {
1775            context: None,
1776            namespace: "dev".into(),
1777            pod: "pod-1".into(),
1778            container: None,
1779            workspace: None,
1780        };
1781        let base_env = vec![
1782            ("VIRTUAL_ENV".to_string(), "/c/.venv".to_string()),
1783            ("MSG".to_string(), "a b".to_string()),
1784        ];
1785        let a = build_kube_terminal_args(&target, &base_env);
1786        // The in-pod env probe is exported (POSIX-quoted) inside the sh -lc
1787        // wrapper, before handing off to the login shell.
1788        assert_eq!(
1789            a.last().unwrap(),
1790            "export VIRTUAL_ENV='/c/.venv'; export MSG='a b'; exec ${SHELL:-/bin/sh} -l"
1791        );
1792    }
1793
1794    #[test]
1795    fn build_kube_terminal_args_without_workspace_skips_cd() {
1796        let target = crate::services::remote::KubeTarget {
1797            context: None,
1798            namespace: "dev".into(),
1799            pod: "pod-1".into(),
1800            container: None,
1801            workspace: None,
1802        };
1803        let a = build_kube_terminal_args(&target, &[]);
1804        assert_eq!(
1805            a,
1806            vec![
1807                "exec",
1808                "-it",
1809                "-n",
1810                "dev",
1811                "pod-1",
1812                "--",
1813                "sh",
1814                "-lc",
1815                "exec ${SHELL:-/bin/sh} -l",
1816            ]
1817        );
1818    }
1819}