Skip to main content

fresh/services/remote/
spawner.rs

1//! Process spawner abstraction
2//!
3//! Provides a trait for spawning processes that works transparently on both
4//! local and remote hosts. Used by the Editor's SpawnProcess handler (for
5//! plugins like git_grep) and by FileProvider (for `git ls-files`).
6//!
7//! Two orthogonal traits live here:
8//!
9//! - [`ProcessSpawner`] — one-shot "run and collect" commands. Callers get
10//!   `{stdout, stderr, exit_code}` back once the child exits. Used by
11//!   plugin `spawnProcess`, find-in-files, `git ls-files`, etc.
12//! - [`LongRunningSpawner`] — long-lived stdio processes (LSP servers,
13//!   future tool agents). Callers get a [`StdioChild`] they can talk to
14//!   via piped stdin/stdout/stderr and kill explicitly. LSP servers route
15//!   through this so an authority pointing at a container runs the server
16//!   inside the container (via `docker exec -i`) instead of on the host.
17
18use crate::services::env_provider::EnvProvider;
19use crate::services::process_hidden::HideWindow;
20use crate::services::process_limits::PostSpawnAction;
21use crate::services::remote::channel::{AgentChannel, ChannelError};
22use crate::services::remote::protocol::{decode_base64, exec_params};
23use crate::services::workspace_trust::{gate, WorkspaceTrust};
24use crate::types::ProcessLimits;
25
26/// Capture the active environment for a *local* host: run the provider's
27/// capture script through `$SHELL -lc …` as a raw subprocess (no env applied —
28/// this is how the env is established, so it must not recurse). Returns the
29/// captured `KEY=VALUE` pairs, or empty when inactive / on failure.
30async fn local_captured_env(provider: &EnvProvider) -> Vec<(String, String)> {
31    provider
32        .current(|script| async move {
33            let shell = std::env::var("SHELL").unwrap_or_else(|_| "/bin/sh".to_string());
34            let output = tokio::process::Command::new(&shell)
35                .arg("-lc")
36                .arg(&script)
37                .hide_window()
38                .output()
39                .await
40                .ok()?;
41            Some(String::from_utf8_lossy(&output.stdout).into_owned())
42        })
43        .await
44}
45use std::path::Path;
46use std::process::ExitStatus;
47use std::sync::Arc;
48use tokio::process::{ChildStderr, ChildStdin, ChildStdout};
49
50/// Result of spawning a process
51#[derive(Debug, Clone)]
52pub struct SpawnResult {
53    pub stdout: String,
54    pub stderr: String,
55    pub exit_code: i32,
56}
57
58/// Error from spawning a process
59#[derive(Debug, thiserror::Error)]
60pub enum SpawnError {
61    #[error("Channel error: {0}")]
62    Channel(#[from] ChannelError),
63
64    #[error("Process error: {0}")]
65    Process(String),
66
67    #[error("Decode error: {0}")]
68    Decode(String),
69}
70
71/// Trait for spawning processes (local or remote)
72///
73/// This abstraction allows plugins and core features (like file discovery)
74/// to spawn processes transparently on either local or remote filesystems.
75#[async_trait::async_trait]
76pub trait ProcessSpawner: Send + Sync {
77    /// Spawn a process and wait for completion
78    async fn spawn(
79        &self,
80        command: String,
81        args: Vec<String>,
82        cwd: Option<String>,
83    ) -> Result<SpawnResult, SpawnError>;
84
85    /// Spawn a process, piping stdout directly to a file instead of
86    /// buffering it in memory. Default impl buffers and writes; concrete
87    /// implementations should override when a streaming path exists.
88    ///
89    /// `SpawnResult.stdout` is empty on success — the bytes are on disk
90    /// at `stdout_to` instead. `stderr` and `exit_code` work as usual.
91    async fn spawn_to_file(
92        &self,
93        command: String,
94        args: Vec<String>,
95        cwd: Option<String>,
96        stdout_to: std::path::PathBuf,
97    ) -> Result<SpawnResult, SpawnError> {
98        // Fallback: collect in memory then write. Concrete impls override
99        // to pipe directly.
100        let result = self.spawn(command, args, cwd).await?;
101        if result.exit_code == 0 || !result.stdout.is_empty() {
102            std::fs::write(&stdout_to, result.stdout.as_bytes())
103                .map_err(|e| SpawnError::Process(format!("write {:?}: {}", stdout_to, e)))?;
104        }
105        Ok(SpawnResult {
106            stdout: String::new(),
107            stderr: result.stderr,
108            exit_code: result.exit_code,
109        })
110    }
111
112    /// Spawn a process that can be cancelled mid-flight via a oneshot
113    /// receiver. When `stdout_to` is `Some`, stdout streams to the file;
114    /// when `None`, it's buffered into `SpawnResult.stdout`.
115    ///
116    /// If `kill_rx` fires before the child exits, the child is killed and
117    /// the result reflects the killed exit status.
118    ///
119    /// Default impl ignores `kill_rx` (no true cancellation for backends
120    /// that buffer in memory). Local override implements real kill.
121    async fn spawn_cancellable(
122        &self,
123        command: String,
124        args: Vec<String>,
125        cwd: Option<String>,
126        stdout_to: Option<std::path::PathBuf>,
127        _kill_rx: tokio::sync::oneshot::Receiver<()>,
128    ) -> Result<SpawnResult, SpawnError> {
129        match stdout_to {
130            Some(p) => self.spawn_to_file(command, args, cwd, p).await,
131            None => self.spawn(command, args, cwd).await,
132        }
133    }
134}
135
136/// Local process spawner using tokio.
137///
138/// Used for local file editing (the default). The optional `env` is layered
139/// onto every child's environment (under the inherited process env) — this is
140/// how an activated environment manager (venv / direnv / mise) injects its
141/// captured variables into one-shot spawns. Empty for the plain default.
142pub struct LocalProcessSpawner {
143    env: Arc<EnvProvider>,
144    trust: Arc<WorkspaceTrust>,
145}
146
147impl LocalProcessSpawner {
148    /// Local spawner gated by `trust`, applying the live `env` provider's
149    /// captured environment to every child.
150    pub fn new(env: Arc<EnvProvider>, trust: Arc<WorkspaceTrust>) -> Self {
151        Self { env, trust }
152    }
153
154    async fn apply_env(&self, cmd: &mut tokio::process::Command) {
155        let env = local_captured_env(&self.env).await;
156        if !env.is_empty() {
157            cmd.envs(env.iter().map(|(k, v)| (k.as_str(), v.as_str())));
158        }
159    }
160}
161
162#[async_trait::async_trait]
163impl ProcessSpawner for LocalProcessSpawner {
164    async fn spawn(
165        &self,
166        command: String,
167        args: Vec<String>,
168        cwd: Option<String>,
169    ) -> Result<SpawnResult, SpawnError> {
170        gate(&self.trust, &command, cwd.as_deref())?;
171        let mut cmd = tokio::process::Command::new(&command);
172        cmd.args(&args);
173        self.apply_env(&mut cmd).await;
174        cmd.hide_window();
175
176        if let Some(ref dir) = cwd {
177            cmd.current_dir(dir);
178        }
179
180        let output = cmd
181            .output()
182            .await
183            .map_err(|e| SpawnError::Process(e.to_string()))?;
184
185        Ok(SpawnResult {
186            stdout: String::from_utf8_lossy(&output.stdout).to_string(),
187            stderr: String::from_utf8_lossy(&output.stderr).to_string(),
188            exit_code: output.status.code().unwrap_or(-1),
189        })
190    }
191
192    /// Cancellable streaming spawn. Handles both `stdout_to = Some(path)`
193    /// (pipe stdout to file) and `stdout_to = None` (buffer in memory),
194    /// with kill support via `kill_rx`.
195    async fn spawn_cancellable(
196        &self,
197        command: String,
198        args: Vec<String>,
199        cwd: Option<String>,
200        stdout_to: Option<std::path::PathBuf>,
201        kill_rx: tokio::sync::oneshot::Receiver<()>,
202    ) -> Result<SpawnResult, SpawnError> {
203        use std::process::Stdio;
204        use tokio::io::AsyncReadExt;
205
206        gate(&self.trust, &command, cwd.as_deref())?;
207        let mut cmd = tokio::process::Command::new(&command);
208        cmd.args(&args);
209        self.apply_env(&mut cmd).await;
210        cmd.hide_window();
211        cmd.stdout(Stdio::piped());
212        cmd.stderr(Stdio::piped());
213        if let Some(ref dir) = cwd {
214            cmd.current_dir(dir);
215        }
216
217        // For file-output mode, ensure parent dir exists. Surface the
218        // failure as a SpawnError rather than silently dropping — if we
219        // can't make the dir, the File::create below would just fail
220        // with a less informative error.
221        if let Some(ref path) = stdout_to {
222            if let Some(parent) = path.parent() {
223                if !parent.as_os_str().is_empty() {
224                    tokio::fs::create_dir_all(parent).await.map_err(|e| {
225                        SpawnError::Process(format!("create_dir_all {:?}: {}", parent, e))
226                    })?;
227                }
228            }
229        }
230
231        let mut child = cmd
232            .spawn()
233            .map_err(|e| SpawnError::Process(e.to_string()))?;
234
235        let mut child_stdout = child
236            .stdout
237            .take()
238            .ok_or_else(|| SpawnError::Process("child stdout missing".to_string()))?;
239        let mut child_stderr = child
240            .stderr
241            .take()
242            .ok_or_else(|| SpawnError::Process("child stderr missing".to_string()))?;
243
244        // Drain stdout (to file or buffer) and stderr concurrently —
245        // both must be drained or the child can stall on a full pipe.
246        let stdout_task: tokio::task::JoinHandle<std::io::Result<Vec<u8>>> = match stdout_to {
247            Some(path) => tokio::spawn(async move {
248                let mut file = tokio::fs::File::create(&path).await?;
249                tokio::io::copy(&mut child_stdout, &mut file).await?;
250                use tokio::io::AsyncWriteExt;
251                // flush + sync are best-effort durability so a reader
252                // opening the file right after spawn resolves sees all
253                // bytes. The actual write happened in `copy` above; a
254                // flush error here only loses the durability hint.
255                if let Err(e) = file.flush().await {
256                    tracing::warn!("spawn_cancellable: file flush failed: {}", e);
257                }
258                if let Err(e) = file.sync_all().await {
259                    tracing::warn!("spawn_cancellable: file sync_all failed: {}", e);
260                }
261                Ok(Vec::new())
262            }),
263            None => tokio::spawn(async move {
264                let mut buf = Vec::new();
265                child_stdout.read_to_end(&mut buf).await?;
266                Ok(buf)
267            }),
268        };
269        let stderr_task: tokio::task::JoinHandle<std::io::Result<Vec<u8>>> =
270            tokio::spawn(async move {
271                let mut buf = Vec::new();
272                child_stderr.read_to_end(&mut buf).await?;
273                Ok(buf)
274            });
275
276        // Race child.wait() against kill_rx so the dispatcher can kill
277        // mid-stream (e.g. user scrolled past the commit before git
278        // finished).
279        let exit_code = tokio::select! {
280            status = child.wait() => status
281                .map(|s| s.code().unwrap_or(-1))
282                .unwrap_or(-1),
283            _ = kill_rx => {
284                // start_kill fails only when the process has already
285                // exited — and we're about to `wait()` to reap either
286                // way, so the failure path collapses with the success
287                // path. Log at debug for diagnostic visibility.
288                if let Err(e) = child.start_kill() {
289                    tracing::debug!("spawn_cancellable: start_kill (already exited?): {}", e);
290                }
291                child.wait().await.map(|s| s.code().unwrap_or(-1)).unwrap_or(-1)
292            }
293        };
294
295        // Both drain tasks must finish; on kill they get EOF when the
296        // child's pipes close.
297        let stdout_bytes = stdout_task
298            .await
299            .map_err(|e| SpawnError::Process(format!("stdout task: {}", e)))?
300            .map_err(|e| SpawnError::Process(format!("stdout drain: {}", e)))?;
301        let stderr_bytes = stderr_task
302            .await
303            .map_err(|e| SpawnError::Process(format!("stderr task: {}", e)))?
304            .map_err(|e| SpawnError::Process(format!("stderr drain: {}", e)))?;
305
306        Ok(SpawnResult {
307            stdout: String::from_utf8_lossy(&stdout_bytes).to_string(),
308            stderr: String::from_utf8_lossy(&stderr_bytes).to_string(),
309            exit_code,
310        })
311    }
312
313    /// Streaming override: pipe child stdout straight into `stdout_to`
314    /// via `tokio::io::copy`. The 43 MB stdout of `git show` for the
315    /// bun-rust-rewrite commit never lands in a single `String`.
316    async fn spawn_to_file(
317        &self,
318        command: String,
319        args: Vec<String>,
320        cwd: Option<String>,
321        stdout_to: std::path::PathBuf,
322    ) -> Result<SpawnResult, SpawnError> {
323        use std::process::Stdio;
324        use tokio::io::AsyncWriteExt;
325
326        gate(&self.trust, &command, cwd.as_deref())?;
327        let mut cmd = tokio::process::Command::new(&command);
328        cmd.args(&args);
329        self.apply_env(&mut cmd).await;
330        cmd.hide_window();
331        cmd.stdout(Stdio::piped());
332        cmd.stderr(Stdio::piped());
333        if let Some(ref dir) = cwd {
334            cmd.current_dir(dir);
335        }
336
337        // Ensure the parent dir exists so the open below doesn't ENOENT.
338        // Surface failures rather than letting File::create error with a
339        // less informative message.
340        if let Some(parent) = stdout_to.parent() {
341            if !parent.as_os_str().is_empty() {
342                tokio::fs::create_dir_all(parent).await.map_err(|e| {
343                    SpawnError::Process(format!("create_dir_all {:?}: {}", parent, e))
344                })?;
345            }
346        }
347
348        let mut file = tokio::fs::File::create(&stdout_to)
349            .await
350            .map_err(|e| SpawnError::Process(format!("create {:?}: {}", stdout_to, e)))?;
351
352        let mut child = cmd
353            .spawn()
354            .map_err(|e| SpawnError::Process(e.to_string()))?;
355
356        let mut child_stdout = child
357            .stdout
358            .take()
359            .ok_or_else(|| SpawnError::Process("child stdout missing".to_string()))?;
360        let mut child_stderr = child
361            .stderr
362            .take()
363            .ok_or_else(|| SpawnError::Process("child stderr missing".to_string()))?;
364
365        // Copy stdout to file and drain stderr concurrently. Both ends
366        // must be drained or the child can stall on a full pipe.
367        let stdout_task = tokio::spawn(async move {
368            let res = tokio::io::copy(&mut child_stdout, &mut file).await;
369            // flush + sync are best-effort durability so a reader
370            // opening the file right after spawn resolves sees all
371            // bytes. The data was already written in `copy` above; a
372            // flush error here only loses the durability hint.
373            if let Err(e) = file.flush().await {
374                tracing::warn!("spawn_to_file: file flush failed: {}", e);
375            }
376            if let Err(e) = file.sync_all().await {
377                tracing::warn!("spawn_to_file: file sync_all failed: {}", e);
378            }
379            res
380        });
381        let stderr_task = tokio::spawn(async move {
382            let mut buf = Vec::new();
383            let res = tokio::io::copy(&mut child_stderr, &mut buf).await;
384            res.map(|_| buf)
385        });
386
387        let status = child
388            .wait()
389            .await
390            .map_err(|e| SpawnError::Process(format!("wait: {}", e)))?;
391
392        // Drop the empty Vec from the streaming task — its only signal
393        // is success/failure of the io::copy and flush, propagated via
394        // the `?` operator.
395        stdout_task
396            .await
397            .map_err(|e| SpawnError::Process(format!("stdout task: {}", e)))?
398            .map_err(|e| SpawnError::Process(format!("stdout copy: {}", e)))?;
399        let stderr_bytes = stderr_task
400            .await
401            .map_err(|e| SpawnError::Process(format!("stderr task: {}", e)))?
402            .map_err(|e| SpawnError::Process(format!("stderr drain: {}", e)))?;
403
404        Ok(SpawnResult {
405            stdout: String::new(),
406            stderr: String::from_utf8_lossy(&stderr_bytes).to_string(),
407            exit_code: status.code().unwrap_or(-1),
408        })
409    }
410}
411
412/// Wrap `(command, args)` so the captured `env` is applied on a backend that
413/// passes an argv array (SSH agent / docker) rather than a shell string:
414/// `env K=V … command args…`. Empty env ⇒ unchanged. (`command` must not
415/// contain `=`, which `env` would mistake for an assignment — true for any
416/// program name.)
417fn env_wrap(env: &[(String, String)], command: &str, args: &[String]) -> (String, Vec<String>) {
418    if env.is_empty() {
419        return (command.to_string(), args.to_vec());
420    }
421    let mut wrapped = Vec::with_capacity(env.len() + 1 + args.len());
422    for (k, v) in env {
423        wrapped.push(format!("{k}={v}"));
424    }
425    wrapped.push(command.to_string());
426    wrapped.extend(args.iter().cloned());
427    ("env".to_string(), wrapped)
428}
429
430/// Remote process spawner via SSH agent
431pub struct RemoteProcessSpawner {
432    channel: Arc<AgentChannel>,
433    env: Arc<EnvProvider>,
434    trust: Arc<WorkspaceTrust>,
435}
436
437impl RemoteProcessSpawner {
438    /// Create a new remote process spawner gated by `trust`, applying the live
439    /// `env` provider (captured on the remote host) to every spawn.
440    pub fn new(
441        channel: Arc<AgentChannel>,
442        env: Arc<EnvProvider>,
443        trust: Arc<WorkspaceTrust>,
444    ) -> Self {
445        Self {
446            channel,
447            env,
448            trust,
449        }
450    }
451
452    /// Capture the active env on the *remote* host by running the provider's
453    /// script through the agent's raw `exec` (no env applied — recursion-free).
454    async fn captured_env(&self) -> Vec<(String, String)> {
455        let channel = self.channel.clone();
456        self.env
457            .current(move |script| async move {
458                let params = exec_params("sh", &["-lc".to_string(), script], None);
459                let (mut data_rx, _result) =
460                    channel.request_streaming("exec", params).await.ok()?;
461                let mut stdout = Vec::new();
462                while let Some(d) = data_rx.recv().await {
463                    if let Some(out) = d.get("out").and_then(|v| v.as_str()) {
464                        if let Ok(b) = decode_base64(out) {
465                            stdout.extend_from_slice(&b);
466                        }
467                    }
468                }
469                Some(String::from_utf8_lossy(&stdout).into_owned())
470            })
471            .await
472    }
473}
474
475#[async_trait::async_trait]
476impl ProcessSpawner for RemoteProcessSpawner {
477    async fn spawn(
478        &self,
479        command: String,
480        args: Vec<String>,
481        cwd: Option<String>,
482    ) -> Result<SpawnResult, SpawnError> {
483        gate(&self.trust, &command, cwd.as_deref())?;
484        let captured = self.captured_env().await;
485        let (eff_cmd, eff_args) = env_wrap(&captured, &command, &args);
486        let params = exec_params(&eff_cmd, &eff_args, cwd.as_deref());
487
488        // Use streaming request to get live output
489        let (mut data_rx, result_rx) = self.channel.request_streaming("exec", params).await?;
490
491        let mut stdout = Vec::new();
492        let mut stderr = Vec::new();
493
494        // Collect streaming output
495        while let Some(data) = data_rx.recv().await {
496            if let Some(out) = data.get("out").and_then(|v| v.as_str()) {
497                if let Ok(decoded) = decode_base64(out) {
498                    stdout.extend_from_slice(&decoded);
499                }
500            }
501            if let Some(err) = data.get("err").and_then(|v| v.as_str()) {
502                if let Ok(decoded) = decode_base64(err) {
503                    stderr.extend_from_slice(&decoded);
504                }
505            }
506        }
507
508        // Get final result
509        let result = result_rx
510            .await
511            .map_err(|_| SpawnError::Channel(ChannelError::ChannelClosed))?
512            .map_err(SpawnError::Process)?;
513
514        let exit_code = result
515            .get("code")
516            .and_then(|v| v.as_i64())
517            .map(|c| c as i32)
518            .unwrap_or(-1);
519
520        Ok(SpawnResult {
521            stdout: String::from_utf8_lossy(&stdout).to_string(),
522            stderr: String::from_utf8_lossy(&stderr).to_string(),
523            exit_code,
524        })
525    }
526
527    async fn spawn_to_file(
528        &self,
529        _command: String,
530        _args: Vec<String>,
531        _cwd: Option<String>,
532        _stdout_to: std::path::PathBuf,
533    ) -> Result<SpawnResult, SpawnError> {
534        Err(SpawnError::Process(
535            "stdoutTo is not supported for remote processes".to_string(),
536        ))
537    }
538}
539
540/// A long-lived child process with piped stdio streams.
541///
542/// Wraps [`tokio::process::Child`] so the LSP code (and future callers
543/// like plugin-managed tool agents) doesn't reach into concrete process
544/// types — that way a container authority can transparently run the
545/// child through `docker exec -i` while the caller keeps talking to an
546/// ordinary stdin/stdout pair.
547///
548/// Streams are `Option`-wrapped so callers can [`Self::take_stdin`] /
549/// [`Self::take_stdout`] / [`Self::take_stderr`] into their own reader
550/// and writer tasks. After all streams are taken, the `StdioChild` is
551/// still useful for lifecycle control via [`Self::kill`] and
552/// [`Self::wait`].
553///
554/// `spawned_locally` tells callers whether `id()` names the real child
555/// process (true for local spawns) or an intermediate like `docker` /
556/// `ssh` (false). LSP's cgroup-attachment step keys off this — applying
557/// a cgroup to the `docker` CLI PID doesn't constrain the container-
558/// side server it exec'd.
559pub struct StdioChild {
560    inner: tokio::process::Child,
561    stdin: Option<ChildStdin>,
562    stdout: Option<ChildStdout>,
563    stderr: Option<ChildStderr>,
564    spawned_locally: bool,
565}
566
567impl StdioChild {
568    /// Construct a `StdioChild` from an already-spawned
569    /// `tokio::process::Child`. Pulls the piped streams out of the
570    /// child so callers can take them individually later.
571    ///
572    /// This constructor is for spawners that don't participate in
573    /// host-side resource limiting (the Docker variant is the
574    /// canonical example). Local spawners should prefer
575    /// [`Self::from_local_tokio_child`] so a `PostSpawnAction` produced
576    /// by [`ProcessLimits::apply_to_command`] is applied to the child's
577    /// PID before the spawner returns.
578    pub fn from_tokio_child(mut child: tokio::process::Child, spawned_locally: bool) -> Self {
579        let stdin = child.stdin.take();
580        let stdout = child.stdout.take();
581        let stderr = child.stderr.take();
582        Self {
583            inner: child,
584            stdin,
585            stdout,
586            stderr,
587            spawned_locally,
588        }
589    }
590
591    /// Construct a `StdioChild` for a locally-spawned child while
592    /// applying any host-side `PostSpawnAction` (cgroup attachment)
593    /// returned by [`ProcessLimits::apply_to_command`]. Best-effort:
594    /// failure to attach logs a warning but doesn't fail the spawn,
595    /// matching the pre-refactor behavior.
596    pub fn from_local_tokio_child(
597        child: tokio::process::Child,
598        post_spawn: PostSpawnAction,
599    ) -> Self {
600        let out = Self::from_tokio_child(child, true);
601        if let Some(pid) = out.inner.id() {
602            post_spawn.apply_to_child(pid);
603        }
604        out
605    }
606
607    /// Take the stdin stream. Returns `None` after the first call.
608    pub fn take_stdin(&mut self) -> Option<ChildStdin> {
609        self.stdin.take()
610    }
611
612    /// Take the stdout stream. Returns `None` after the first call.
613    pub fn take_stdout(&mut self) -> Option<ChildStdout> {
614        self.stdout.take()
615    }
616
617    /// Take the stderr stream. Returns `None` after the first call.
618    pub fn take_stderr(&mut self) -> Option<ChildStderr> {
619        self.stderr.take()
620    }
621
622    /// PID of the immediate child process. For local spawns this is
623    /// the LSP server itself; for docker/ssh this is the CLI wrapper.
624    /// Use [`Self::spawned_locally`] to tell which.
625    pub fn id(&self) -> Option<u32> {
626        self.inner.id()
627    }
628
629    /// `true` when the child PID names the real target process. Callers
630    /// that only apply host-side resource controls (cgroups, rlimits)
631    /// should skip their application when this is `false`.
632    pub fn spawned_locally(&self) -> bool {
633        self.spawned_locally
634    }
635
636    /// Request termination. Forwards to [`tokio::process::Child::kill`].
637    pub async fn kill(&mut self) -> std::io::Result<()> {
638        self.inner.kill().await
639    }
640
641    /// Await exit. Forwards to [`tokio::process::Child::wait`].
642    pub async fn wait(&mut self) -> std::io::Result<ExitStatus> {
643        self.inner.wait().await
644    }
645}
646
647/// Spawner for long-lived stdio processes (LSP servers, tool agents).
648///
649/// Separate from [`ProcessSpawner`] because the APIs diverge in two
650/// ways that don't compose: [`ProcessSpawner::spawn`] awaits
651/// completion and returns collected output; callers of
652/// `LongRunningSpawner` need a live child they can read from and
653/// write to over time.
654///
655/// Authorities expose one of these alongside their filesystem and
656/// one-shot spawner. Routing LSP spawning through it is what gives
657/// container authorities in-container LSP without a special-cased
658/// branch in `LspHandle`.
659///
660/// Callers pass an optional [`ProcessLimits`] block so local spawners
661/// can honor host-side memory / CPU limits. Non-local variants (docker,
662/// ssh) don't have a meaningful way to impose host limits on their
663/// child — cgroups attached to the `docker` CLI PID don't reach into
664/// the container — and are expected to ignore them.
665#[async_trait::async_trait]
666pub trait LongRunningSpawner: Send + Sync {
667    /// Spawn `command` with `args` as a long-lived stdio child under
668    /// this authority. Stdin/stdout/stderr are piped so the caller can
669    /// hand them to dedicated reader/writer tasks. `limits`, when
670    /// provided, lets local spawners attach cgroups or `setrlimit`;
671    /// remote spawners are expected to ignore it (see trait docs).
672    async fn spawn_stdio(
673        &self,
674        command: &str,
675        args: &[String],
676        env: Vec<(String, String)>,
677        cwd: Option<&Path>,
678        limits: Option<&ProcessLimits>,
679    ) -> Result<StdioChild, SpawnError>;
680
681    /// Check whether `command` resolves to an executable under this
682    /// authority. Routed through the same spawner so an SSH authority
683    /// probes the remote `$PATH` and a container authority probes the
684    /// container's `$PATH` — unlike `which::which` which only ever sees
685    /// the host.
686    async fn command_exists(&self, command: &str) -> bool;
687}
688
689/// Local long-running spawner using `tokio::process::Command` directly.
690///
691/// Functionally equivalent to how `LspHandle::spawn` works today, but
692/// exposed through the trait so non-local authorities can substitute
693/// their own implementation without any LSP-side awareness. Applies
694/// any `ProcessLimits` passed in via the same machinery the
695/// pre-refactor LSP code used (`apply_to_command` + `apply_to_child`).
696pub struct LocalLongRunningSpawner {
697    env: Arc<EnvProvider>,
698    trust: Arc<WorkspaceTrust>,
699}
700
701impl LocalLongRunningSpawner {
702    /// Local long-running spawner gated by `trust`, applying the live `env`
703    /// provider's captured environment under each child's environment.
704    pub fn new(env: Arc<EnvProvider>, trust: Arc<WorkspaceTrust>) -> Self {
705        Self { env, trust }
706    }
707}
708
709#[async_trait::async_trait]
710impl LongRunningSpawner for LocalLongRunningSpawner {
711    async fn spawn_stdio(
712        &self,
713        command: &str,
714        args: &[String],
715        env: Vec<(String, String)>,
716        cwd: Option<&Path>,
717        limits: Option<&ProcessLimits>,
718    ) -> Result<StdioChild, SpawnError> {
719        gate(
720            &self.trust,
721            command,
722            cwd.map(|p| p.to_string_lossy()).as_deref(),
723        )?;
724        let captured = local_captured_env(&self.env).await;
725        let mut cmd = tokio::process::Command::new(command);
726        cmd.args(args)
727            // Provider env first, then the per-call env so the caller wins.
728            .envs(captured.iter().map(|(k, v)| (k.as_str(), v.as_str())))
729            .envs(env)
730            .stdin(std::process::Stdio::piped())
731            .stdout(std::process::Stdio::piped())
732            .stderr(std::process::Stdio::piped())
733            .hide_window()
734            .kill_on_drop(true);
735        if let Some(dir) = cwd {
736            cmd.current_dir(dir);
737        }
738
739        // Apply pre-spawn hooks (cgroup path selection, setrlimit
740        // via `pre_exec`). Errors bubble up so callers see
741        // configuration problems early — matches the pre-refactor
742        // LSP behavior.
743        let post_spawn = match limits {
744            Some(lim) => lim
745                .apply_to_command(&mut cmd)
746                .map_err(|e| SpawnError::Process(format!("Failed to apply process limits: {e}")))?,
747            None => PostSpawnAction::default(),
748        };
749
750        let child = cmd
751            .spawn()
752            .map_err(|e| SpawnError::Process(e.to_string()))?;
753        Ok(StdioChild::from_local_tokio_child(child, post_spawn))
754    }
755
756    async fn command_exists(&self, command: &str) -> bool {
757        // Honor the active env's PATH (e.g. a venv's `bin/`) so the existence
758        // probe searches the same place `spawn_stdio` will — otherwise a
759        // repo-local `pyright`/`ruff` looks missing and the server never
760        // starts. Falls back to the process PATH when no env is active.
761        let captured = local_captured_env(&self.env).await;
762        if let Some((_, path)) = captured.iter().find(|(k, _)| k == "PATH") {
763            let cwd = std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."));
764            return which::which_in(command, Some(path), &cwd).is_ok();
765        }
766        which::which(command).is_ok()
767    }
768}
769
770/// POSIX shell single-quote: wrap in `'…'`, escaping embedded quotes as
771/// `'\''`. Safe to splice into a remote command string.
772fn shell_quote(s: &str) -> String {
773    let mut out = String::with_capacity(s.len() + 2);
774    out.push('\'');
775    for c in s.chars() {
776        if c == '\'' {
777            out.push_str("'\\''");
778        } else {
779            out.push(c);
780        }
781    }
782    out.push('\'');
783    out
784}
785
786/// Build the remote shell command for a long-running process:
787/// `[cd <cwd> && ]exec env K=V… <command> <args…>` (all shell-quoted).
788/// `exec` replaces the login shell so the server *is* the SSH channel's
789/// process — EOF/kill propagate to it directly. `env` applies the injected
790/// environment then execs the real binary.
791fn build_remote_exec(
792    env: &[(String, String)],
793    cwd: Option<&str>,
794    command: &str,
795    args: &[String],
796) -> String {
797    let mut s = String::new();
798    if let Some(dir) = cwd {
799        s.push_str("cd ");
800        s.push_str(&shell_quote(dir));
801        s.push_str(" && ");
802    }
803    s.push_str("exec ");
804    if !env.is_empty() {
805        s.push_str("env ");
806        for (k, v) in env {
807            s.push_str(k);
808            s.push('=');
809            s.push_str(&shell_quote(v));
810            s.push(' ');
811        }
812    }
813    s.push_str(&shell_quote(command));
814    for a in args {
815        s.push(' ');
816        s.push_str(&shell_quote(a));
817    }
818    s
819}
820
821/// Build the remote command for an existence probe. `command -v` is a shell
822/// builtin (not a binary), so the env is applied via `export` assignments
823/// rather than the `env` binary.
824fn build_remote_command_exists(env: &[(String, String)], command: &str) -> String {
825    let mut s = String::new();
826    for (k, v) in env {
827        s.push_str("export ");
828        s.push_str(k);
829        s.push('=');
830        s.push_str(&shell_quote(v));
831        s.push_str("; ");
832    }
833    s.push_str("command -v ");
834    s.push_str(&shell_quote(command));
835    s.push_str(" >/dev/null 2>&1");
836    s
837}
838
839/// Assemble the `ssh` argv: connection options, `user@host`, then the single
840/// remote command string (ssh concatenates trailing args with spaces, so the
841/// command must already be one shell-quoted string). Mirrors the options the
842/// agent connection uses.
843fn build_ssh_args(
844    params: &crate::services::remote::ConnectionParams,
845    remote_cmd: &str,
846) -> Vec<String> {
847    let mut a = vec![
848        "-o".to_string(),
849        "StrictHostKeyChecking=accept-new".to_string(),
850        "-o".to_string(),
851        "BatchMode=yes".to_string(),
852    ];
853    if let Some(port) = params.port {
854        a.push("-p".to_string());
855        a.push(port.to_string());
856    }
857    if let Some(ref identity) = params.identity_file {
858        a.push("-i".to_string());
859        a.push(identity.to_string_lossy().into_owned());
860    }
861    a.push(format!("{}@{}", params.user, params.host));
862    a.push(remote_cmd.to_string());
863    a
864}
865
866/// Long-running spawner over SSH: each LSP server (or tool agent) gets its own
867/// `ssh user@host <remote-cmd>` subprocess, whose piped stdio *is* the remote
868/// process's stdio. Returning a real local [`tokio::process::Child`] (the ssh
869/// client) means the LSP I/O layer talks to ordinary `ChildStdin`/`ChildStdout`
870/// with no awareness it's remote — the same trick the Docker spawner uses with
871/// the local `docker` CLI.
872///
873/// This opens a separate SSH connection per server rather than multiplexing
874/// through the agent: the agent's one-shot `exec` can't keep a process alive
875/// with writable stdin, and abstracting `StdioChild` / the whole LSP I/O layer
876/// over the agent channel would be a far larger change. The tradeoff is extra
877/// SSH connections; the win is LSP that actually runs on the remote host
878/// instead of the host-local fallback.
879pub struct RemoteLongRunningSpawner {
880    params: crate::services::remote::ConnectionParams,
881    env: Arc<EnvProvider>,
882    trust: Arc<WorkspaceTrust>,
883}
884
885impl RemoteLongRunningSpawner {
886    /// Spawner for `params`, gated by `trust`, applying the live `env` provider
887    /// (captured on the remote host) to every server it launches.
888    pub fn new(
889        params: crate::services::remote::ConnectionParams,
890        env: Arc<EnvProvider>,
891        trust: Arc<WorkspaceTrust>,
892    ) -> Self {
893        Self { params, env, trust }
894    }
895
896    /// Capture the active env on the *remote* host: run the provider's script
897    /// through a one-shot `ssh … <script>` (raw — no env applied).
898    async fn captured_env(&self) -> Vec<(String, String)> {
899        let params = self.params.clone();
900        self.env
901            .current(move |script| async move {
902                let ssh_args = build_ssh_args(&params, &script);
903                let output = tokio::process::Command::new("ssh")
904                    .args(&ssh_args)
905                    .hide_window()
906                    .output()
907                    .await
908                    .ok()?;
909                Some(String::from_utf8_lossy(&output.stdout).into_owned())
910            })
911            .await
912    }
913}
914
915#[async_trait::async_trait]
916impl LongRunningSpawner for RemoteLongRunningSpawner {
917    async fn spawn_stdio(
918        &self,
919        command: &str,
920        args: &[String],
921        env: Vec<(String, String)>,
922        cwd: Option<&Path>,
923        _limits: Option<&ProcessLimits>,
924    ) -> Result<StdioChild, SpawnError> {
925        // Host-side process limits don't reach a remote process (the local
926        // PID is the ssh client), so `_limits` is ignored — same as Docker.
927        let cwd_str = cwd.map(|p| p.to_string_lossy().into_owned());
928        gate(&self.trust, command, cwd_str.as_deref())?;
929
930        // Captured (provider) env first, then the per-call env so the caller
931        // wins on conflict (mirrors the local layering).
932        let mut merged = self.captured_env().await;
933        merged.extend(env);
934
935        let remote = build_remote_exec(&merged, cwd_str.as_deref(), command, args);
936        let ssh_args = build_ssh_args(&self.params, &remote);
937
938        let mut cmd = tokio::process::Command::new("ssh");
939        cmd.args(&ssh_args)
940            .stdin(std::process::Stdio::piped())
941            .stdout(std::process::Stdio::piped())
942            .stderr(std::process::Stdio::piped())
943            .hide_window()
944            .kill_on_drop(true);
945
946        let child = cmd
947            .spawn()
948            .map_err(|e| SpawnError::Process(e.to_string()))?;
949        // `spawned_locally = false`: the local PID is the ssh client, not the
950        // remote server, so host-only resource controls skip themselves.
951        Ok(StdioChild::from_tokio_child(child, false))
952    }
953
954    async fn command_exists(&self, command: &str) -> bool {
955        let captured = self.captured_env().await;
956        let remote = build_remote_command_exists(&captured, command);
957        let ssh_args = build_ssh_args(&self.params, &remote);
958        match tokio::process::Command::new("ssh")
959            .args(&ssh_args)
960            .hide_window()
961            .output()
962            .await
963        {
964            Ok(output) => output.status.success(),
965            Err(_) => false,
966        }
967    }
968}
969
970#[cfg(test)]
971mod tests {
972    use super::*;
973    use tokio::io::AsyncReadExt;
974
975    #[tokio::test]
976    async fn test_local_spawner() {
977        let spawner = LocalProcessSpawner::new(
978            Arc::new(EnvProvider::inactive()),
979            Arc::new(WorkspaceTrust::permissive()),
980        );
981        let result = spawner
982            .spawn("echo".to_string(), vec!["hello".to_string()], None)
983            .await
984            .unwrap();
985
986        assert_eq!(result.exit_code, 0);
987        assert!(result.stdout.trim() == "hello");
988    }
989
990    #[tokio::test]
991    async fn test_local_spawner_stdout_to_file() {
992        let spawner = LocalProcessSpawner::new(
993            Arc::new(EnvProvider::inactive()),
994            Arc::new(WorkspaceTrust::permissive()),
995        );
996        let tmp =
997            std::env::temp_dir().join(format!("fresh-spawner-test-{}.out", std::process::id()));
998        // Best-effort cleanup of any leftover from a previous run.
999        // Failure (e.g. NotFound) is fine — the spawn below will
1000        // create the file fresh.
1001        #[allow(clippy::let_underscore_must_use)]
1002        let _ = std::fs::remove_file(&tmp);
1003        let result = spawner
1004            .spawn_to_file(
1005                "echo".to_string(),
1006                vec!["hello-from-disk".to_string()],
1007                None,
1008                tmp.clone(),
1009            )
1010            .await
1011            .unwrap();
1012
1013        assert_eq!(result.exit_code, 0);
1014        assert!(
1015            result.stdout.is_empty(),
1016            "stdout should be empty when streaming"
1017        );
1018        let contents = std::fs::read_to_string(&tmp).expect("output file should exist");
1019        assert_eq!(contents.trim(), "hello-from-disk");
1020        // Best-effort cleanup — leaving a temp file behind on
1021        // failure is acceptable and the next run's pre-cleanup
1022        // handles it.
1023        #[allow(clippy::let_underscore_must_use)]
1024        let _ = std::fs::remove_file(&tmp);
1025    }
1026
1027    #[tokio::test]
1028    async fn test_local_spawner_cancellable_kill() {
1029        let spawner = LocalProcessSpawner::new(
1030            Arc::new(EnvProvider::inactive()),
1031            Arc::new(WorkspaceTrust::permissive()),
1032        );
1033        let (kill_tx, kill_rx) = tokio::sync::oneshot::channel::<()>();
1034
1035        // Start a sleep that would take 30s normally; fire kill after 100ms.
1036        let task = tokio::spawn(async move {
1037            spawner
1038                .spawn_cancellable(
1039                    "sleep".to_string(),
1040                    vec!["30".to_string()],
1041                    None,
1042                    None,
1043                    kill_rx,
1044                )
1045                .await
1046        });
1047
1048        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1049        // Fire the kill. Err means the receiver was dropped (task
1050        // already finished), which would mean the 30s sleep returned
1051        // promptly on its own — impossible in this test window, but
1052        // not worth a panic either way; the subsequent task.await
1053        // surfaces any real problem.
1054        #[allow(clippy::let_underscore_must_use)]
1055        let _ = kill_tx.send(());
1056
1057        let start = std::time::Instant::now();
1058        let result = task.await.unwrap().unwrap();
1059        let elapsed = start.elapsed();
1060
1061        // SIGKILL'd sleep on Unix returns exit_code 137 or -1 (no code).
1062        // The point is we returned promptly, not after 30s.
1063        assert!(
1064            elapsed < std::time::Duration::from_secs(5),
1065            "kill should be prompt, took {:?}",
1066            elapsed
1067        );
1068        assert_ne!(result.exit_code, 0, "killed process shouldn't be exit 0");
1069    }
1070
1071    #[tokio::test]
1072    async fn local_long_running_spawn_stdio_pipes_output() {
1073        let spawner = LocalLongRunningSpawner::new(
1074            Arc::new(EnvProvider::inactive()),
1075            Arc::new(WorkspaceTrust::permissive()),
1076        );
1077        let mut child = spawner
1078            .spawn_stdio(
1079                "sh",
1080                &["-c".into(), "echo hi".into()],
1081                Vec::new(),
1082                None,
1083                None,
1084            )
1085            .await
1086            .expect("spawn succeeds");
1087
1088        let mut stdout = child.take_stdout().expect("stdout piped");
1089        let mut buf = String::new();
1090        stdout.read_to_string(&mut buf).await.unwrap();
1091        assert_eq!(buf.trim(), "hi");
1092
1093        let status = child.wait().await.unwrap();
1094        assert!(status.success());
1095        assert!(child.spawned_locally());
1096    }
1097
1098    #[tokio::test]
1099    async fn local_long_running_command_exists_for_sh() {
1100        let spawner = LocalLongRunningSpawner::new(
1101            Arc::new(EnvProvider::inactive()),
1102            Arc::new(WorkspaceTrust::permissive()),
1103        );
1104        assert!(spawner.command_exists("sh").await);
1105        assert!(
1106            !spawner
1107                .command_exists("fresh-unlikely-binary-name-ygzu9")
1108                .await
1109        );
1110    }
1111
1112    // Unix-only: the local env capture runs the recipe through a POSIX login
1113    // shell (`$SHELL -lc`, falling back to `/bin/sh`). On Windows there is no
1114    // such shell, so capture intentionally no-ops — there's nothing to assert.
1115    #[cfg(unix)]
1116    #[tokio::test]
1117    async fn local_spawner_applies_active_env_provider() {
1118        // Full local path: an active provider whose snippet exports a var →
1119        // captured via the login shell → injected into the spawned child.
1120        let env = Arc::new(EnvProvider::inactive());
1121        env.set("export FRESH_ENV_TEST=hi-from-provider".into(), None);
1122        let spawner = LocalProcessSpawner::new(env, Arc::new(WorkspaceTrust::permissive()));
1123        let result = spawner
1124            .spawn(
1125                "sh".into(),
1126                vec!["-c".into(), "printf %s \"$FRESH_ENV_TEST\"".into()],
1127                None,
1128            )
1129            .await
1130            .unwrap();
1131        assert_eq!(result.exit_code, 0);
1132        assert_eq!(result.stdout, "hi-from-provider");
1133    }
1134
1135    #[tokio::test]
1136    async fn local_spawner_inactive_provider_injects_nothing() {
1137        let spawner = LocalProcessSpawner::new(
1138            Arc::new(EnvProvider::inactive()),
1139            Arc::new(WorkspaceTrust::permissive()),
1140        );
1141        let result = spawner
1142            .spawn(
1143                "sh".into(),
1144                vec!["-c".into(), "printf %s \"${FRESH_ENV_TEST:-unset}\"".into()],
1145                None,
1146            )
1147            .await
1148            .unwrap();
1149        assert_eq!(result.stdout, "unset");
1150    }
1151
1152    // --- RemoteLongRunningSpawner command builders (pure, no SSH needed) ---
1153
1154    #[test]
1155    fn shell_quote_wraps_and_escapes() {
1156        assert_eq!(shell_quote("abc"), "'abc'");
1157        assert_eq!(shell_quote("a b/c"), "'a b/c'");
1158        assert_eq!(shell_quote("a'b"), "'a'\\''b'");
1159    }
1160
1161    #[test]
1162    fn build_remote_exec_with_cwd_and_env() {
1163        let env = vec![("VIRTUAL_ENV".to_string(), "/proj/.venv".to_string())];
1164        let s = build_remote_exec(&env, Some("/proj dir"), "python", &["x.py".to_string()]);
1165        assert_eq!(
1166            s,
1167            "cd '/proj dir' && exec env VIRTUAL_ENV='/proj/.venv' 'python' 'x.py'"
1168        );
1169    }
1170
1171    #[test]
1172    fn build_remote_exec_minimal() {
1173        assert_eq!(build_remote_exec(&[], None, "gopls", &[]), "exec 'gopls'");
1174    }
1175
1176    #[test]
1177    fn build_remote_command_exists_exports_env() {
1178        let env = vec![("PATH".to_string(), "/proj/.venv/bin:/usr/bin".to_string())];
1179        assert_eq!(
1180            build_remote_command_exists(&env, "pyright"),
1181            "export PATH='/proj/.venv/bin:/usr/bin'; command -v 'pyright' >/dev/null 2>&1"
1182        );
1183    }
1184
1185    #[test]
1186    fn build_ssh_args_full() {
1187        let params = crate::services::remote::ConnectionParams {
1188            user: "u".into(),
1189            host: "h".into(),
1190            port: Some(2222),
1191            identity_file: Some(std::path::PathBuf::from("/k")),
1192        };
1193        let a = build_ssh_args(&params, "echo hi");
1194        let expected: Vec<String> = [
1195            "-o",
1196            "StrictHostKeyChecking=accept-new",
1197            "-o",
1198            "BatchMode=yes",
1199            "-p",
1200            "2222",
1201            "-i",
1202            "/k",
1203            "u@h",
1204            "echo hi",
1205        ]
1206        .into_iter()
1207        .map(String::from)
1208        .collect();
1209        assert_eq!(a, expected);
1210    }
1211}