Skip to main content

fresh/services/remote/
spawner.rs

1//! Process spawner abstraction
2//!
3//! Provides a trait for spawning processes that works transparently on both
4//! local and remote hosts. Used by the Editor's SpawnProcess handler (for
5//! plugins like git_grep) and by FileProvider (for `git ls-files`).
6//!
7//! Two orthogonal traits live here:
8//!
9//! - [`ProcessSpawner`] — one-shot "run and collect" commands. Callers get
10//!   `{stdout, stderr, exit_code}` back once the child exits. Used by
11//!   plugin `spawnProcess`, find-in-files, `git ls-files`, etc.
12//! - [`LongRunningSpawner`] — long-lived stdio processes (LSP servers,
13//!   future tool agents). Callers get a [`StdioChild`] they can talk to
14//!   via piped stdin/stdout/stderr and kill explicitly. LSP servers route
15//!   through this so an authority pointing at a container runs the server
16//!   inside the container (via `docker exec -i`) instead of on the host.
17
18use crate::services::process_hidden::HideWindow;
19use crate::services::process_limits::PostSpawnAction;
20use crate::services::remote::channel::{AgentChannel, ChannelError};
21use crate::services::remote::protocol::{decode_base64, exec_params};
22use crate::types::ProcessLimits;
23use std::path::Path;
24use std::process::ExitStatus;
25use std::sync::Arc;
26use tokio::process::{ChildStderr, ChildStdin, ChildStdout};
27
28/// Result of spawning a process
29#[derive(Debug, Clone)]
30pub struct SpawnResult {
31    pub stdout: String,
32    pub stderr: String,
33    pub exit_code: i32,
34}
35
36/// Error from spawning a process
37#[derive(Debug, thiserror::Error)]
38pub enum SpawnError {
39    #[error("Channel error: {0}")]
40    Channel(#[from] ChannelError),
41
42    #[error("Process error: {0}")]
43    Process(String),
44
45    #[error("Decode error: {0}")]
46    Decode(String),
47}
48
49/// Trait for spawning processes (local or remote)
50///
51/// This abstraction allows plugins and core features (like file discovery)
52/// to spawn processes transparently on either local or remote filesystems.
53#[async_trait::async_trait]
54pub trait ProcessSpawner: Send + Sync {
55    /// Spawn a process and wait for completion
56    async fn spawn(
57        &self,
58        command: String,
59        args: Vec<String>,
60        cwd: Option<String>,
61    ) -> Result<SpawnResult, SpawnError>;
62}
63
64/// Local process spawner using tokio
65///
66/// Used for local file editing (the default).
67pub struct LocalProcessSpawner;
68
69#[async_trait::async_trait]
70impl ProcessSpawner for LocalProcessSpawner {
71    async fn spawn(
72        &self,
73        command: String,
74        args: Vec<String>,
75        cwd: Option<String>,
76    ) -> Result<SpawnResult, SpawnError> {
77        let mut cmd = tokio::process::Command::new(&command);
78        cmd.args(&args);
79        cmd.hide_window();
80
81        if let Some(ref dir) = cwd {
82            cmd.current_dir(dir);
83        }
84
85        let output = cmd
86            .output()
87            .await
88            .map_err(|e| SpawnError::Process(e.to_string()))?;
89
90        Ok(SpawnResult {
91            stdout: String::from_utf8_lossy(&output.stdout).to_string(),
92            stderr: String::from_utf8_lossy(&output.stderr).to_string(),
93            exit_code: output.status.code().unwrap_or(-1),
94        })
95    }
96}
97
98/// Remote process spawner via SSH agent
99pub struct RemoteProcessSpawner {
100    channel: Arc<AgentChannel>,
101}
102
103impl RemoteProcessSpawner {
104    /// Create a new remote process spawner
105    pub fn new(channel: Arc<AgentChannel>) -> Self {
106        Self { channel }
107    }
108}
109
110#[async_trait::async_trait]
111impl ProcessSpawner for RemoteProcessSpawner {
112    async fn spawn(
113        &self,
114        command: String,
115        args: Vec<String>,
116        cwd: Option<String>,
117    ) -> Result<SpawnResult, SpawnError> {
118        let params = exec_params(&command, &args, cwd.as_deref());
119
120        // Use streaming request to get live output
121        let (mut data_rx, result_rx) = self.channel.request_streaming("exec", params).await?;
122
123        let mut stdout = Vec::new();
124        let mut stderr = Vec::new();
125
126        // Collect streaming output
127        while let Some(data) = data_rx.recv().await {
128            if let Some(out) = data.get("out").and_then(|v| v.as_str()) {
129                if let Ok(decoded) = decode_base64(out) {
130                    stdout.extend_from_slice(&decoded);
131                }
132            }
133            if let Some(err) = data.get("err").and_then(|v| v.as_str()) {
134                if let Ok(decoded) = decode_base64(err) {
135                    stderr.extend_from_slice(&decoded);
136                }
137            }
138        }
139
140        // Get final result
141        let result = result_rx
142            .await
143            .map_err(|_| SpawnError::Channel(ChannelError::ChannelClosed))?
144            .map_err(SpawnError::Process)?;
145
146        let exit_code = result
147            .get("code")
148            .and_then(|v| v.as_i64())
149            .map(|c| c as i32)
150            .unwrap_or(-1);
151
152        Ok(SpawnResult {
153            stdout: String::from_utf8_lossy(&stdout).to_string(),
154            stderr: String::from_utf8_lossy(&stderr).to_string(),
155            exit_code,
156        })
157    }
158}
159
160/// A long-lived child process with piped stdio streams.
161///
162/// Wraps [`tokio::process::Child`] so the LSP code (and future callers
163/// like plugin-managed tool agents) doesn't reach into concrete process
164/// types — that way a container authority can transparently run the
165/// child through `docker exec -i` while the caller keeps talking to an
166/// ordinary stdin/stdout pair.
167///
168/// Streams are `Option`-wrapped so callers can [`Self::take_stdin`] /
169/// [`Self::take_stdout`] / [`Self::take_stderr`] into their own reader
170/// and writer tasks. After all streams are taken, the `StdioChild` is
171/// still useful for lifecycle control via [`Self::kill`] and
172/// [`Self::wait`].
173///
174/// `spawned_locally` tells callers whether `id()` names the real child
175/// process (true for local spawns) or an intermediate like `docker` /
176/// `ssh` (false). LSP's cgroup-attachment step keys off this — applying
177/// a cgroup to the `docker` CLI PID doesn't constrain the container-
178/// side server it exec'd.
179pub struct StdioChild {
180    inner: tokio::process::Child,
181    stdin: Option<ChildStdin>,
182    stdout: Option<ChildStdout>,
183    stderr: Option<ChildStderr>,
184    spawned_locally: bool,
185}
186
187impl StdioChild {
188    /// Construct a `StdioChild` from an already-spawned
189    /// `tokio::process::Child`. Pulls the piped streams out of the
190    /// child so callers can take them individually later.
191    ///
192    /// This constructor is for spawners that don't participate in
193    /// host-side resource limiting (the Docker variant is the
194    /// canonical example). Local spawners should prefer
195    /// [`Self::from_local_tokio_child`] so a `PostSpawnAction` produced
196    /// by [`ProcessLimits::apply_to_command`] is applied to the child's
197    /// PID before the spawner returns.
198    pub fn from_tokio_child(mut child: tokio::process::Child, spawned_locally: bool) -> Self {
199        let stdin = child.stdin.take();
200        let stdout = child.stdout.take();
201        let stderr = child.stderr.take();
202        Self {
203            inner: child,
204            stdin,
205            stdout,
206            stderr,
207            spawned_locally,
208        }
209    }
210
211    /// Construct a `StdioChild` for a locally-spawned child while
212    /// applying any host-side `PostSpawnAction` (cgroup attachment)
213    /// returned by [`ProcessLimits::apply_to_command`]. Best-effort:
214    /// failure to attach logs a warning but doesn't fail the spawn,
215    /// matching the pre-refactor behavior.
216    pub fn from_local_tokio_child(
217        child: tokio::process::Child,
218        post_spawn: PostSpawnAction,
219    ) -> Self {
220        let out = Self::from_tokio_child(child, true);
221        if let Some(pid) = out.inner.id() {
222            post_spawn.apply_to_child(pid);
223        }
224        out
225    }
226
227    /// Take the stdin stream. Returns `None` after the first call.
228    pub fn take_stdin(&mut self) -> Option<ChildStdin> {
229        self.stdin.take()
230    }
231
232    /// Take the stdout stream. Returns `None` after the first call.
233    pub fn take_stdout(&mut self) -> Option<ChildStdout> {
234        self.stdout.take()
235    }
236
237    /// Take the stderr stream. Returns `None` after the first call.
238    pub fn take_stderr(&mut self) -> Option<ChildStderr> {
239        self.stderr.take()
240    }
241
242    /// PID of the immediate child process. For local spawns this is
243    /// the LSP server itself; for docker/ssh this is the CLI wrapper.
244    /// Use [`Self::spawned_locally`] to tell which.
245    pub fn id(&self) -> Option<u32> {
246        self.inner.id()
247    }
248
249    /// `true` when the child PID names the real target process. Callers
250    /// that only apply host-side resource controls (cgroups, rlimits)
251    /// should skip their application when this is `false`.
252    pub fn spawned_locally(&self) -> bool {
253        self.spawned_locally
254    }
255
256    /// Request termination. Forwards to [`tokio::process::Child::kill`].
257    pub async fn kill(&mut self) -> std::io::Result<()> {
258        self.inner.kill().await
259    }
260
261    /// Await exit. Forwards to [`tokio::process::Child::wait`].
262    pub async fn wait(&mut self) -> std::io::Result<ExitStatus> {
263        self.inner.wait().await
264    }
265}
266
267/// Spawner for long-lived stdio processes (LSP servers, tool agents).
268///
269/// Separate from [`ProcessSpawner`] because the APIs diverge in two
270/// ways that don't compose: [`ProcessSpawner::spawn`] awaits
271/// completion and returns collected output; callers of
272/// `LongRunningSpawner` need a live child they can read from and
273/// write to over time.
274///
275/// Authorities expose one of these alongside their filesystem and
276/// one-shot spawner. Routing LSP spawning through it is what gives
277/// container authorities in-container LSP without a special-cased
278/// branch in `LspHandle`.
279///
280/// Callers pass an optional [`ProcessLimits`] block so local spawners
281/// can honor host-side memory / CPU limits. Non-local variants (docker,
282/// ssh) don't have a meaningful way to impose host limits on their
283/// child — cgroups attached to the `docker` CLI PID don't reach into
284/// the container — and are expected to ignore them.
285#[async_trait::async_trait]
286pub trait LongRunningSpawner: Send + Sync {
287    /// Spawn `command` with `args` as a long-lived stdio child under
288    /// this authority. Stdin/stdout/stderr are piped so the caller can
289    /// hand them to dedicated reader/writer tasks. `limits`, when
290    /// provided, lets local spawners attach cgroups or `setrlimit`;
291    /// remote spawners are expected to ignore it (see trait docs).
292    async fn spawn_stdio(
293        &self,
294        command: &str,
295        args: &[String],
296        env: Vec<(String, String)>,
297        cwd: Option<&Path>,
298        limits: Option<&ProcessLimits>,
299    ) -> Result<StdioChild, SpawnError>;
300
301    /// Check whether `command` resolves to an executable under this
302    /// authority. Routed through the same spawner so an SSH authority
303    /// probes the remote `$PATH` and a container authority probes the
304    /// container's `$PATH` — unlike `which::which` which only ever sees
305    /// the host.
306    async fn command_exists(&self, command: &str) -> bool;
307}
308
309/// Local long-running spawner using `tokio::process::Command` directly.
310///
311/// Functionally equivalent to how `LspHandle::spawn` works today, but
312/// exposed through the trait so non-local authorities can substitute
313/// their own implementation without any LSP-side awareness. Applies
314/// any `ProcessLimits` passed in via the same machinery the
315/// pre-refactor LSP code used (`apply_to_command` + `apply_to_child`).
316pub struct LocalLongRunningSpawner;
317
318#[async_trait::async_trait]
319impl LongRunningSpawner for LocalLongRunningSpawner {
320    async fn spawn_stdio(
321        &self,
322        command: &str,
323        args: &[String],
324        env: Vec<(String, String)>,
325        cwd: Option<&Path>,
326        limits: Option<&ProcessLimits>,
327    ) -> Result<StdioChild, SpawnError> {
328        let mut cmd = tokio::process::Command::new(command);
329        cmd.args(args)
330            .envs(env)
331            .stdin(std::process::Stdio::piped())
332            .stdout(std::process::Stdio::piped())
333            .stderr(std::process::Stdio::piped())
334            .hide_window()
335            .kill_on_drop(true);
336        if let Some(dir) = cwd {
337            cmd.current_dir(dir);
338        }
339
340        // Apply pre-spawn hooks (cgroup path selection, setrlimit
341        // via `pre_exec`). Errors bubble up so callers see
342        // configuration problems early — matches the pre-refactor
343        // LSP behavior.
344        let post_spawn = match limits {
345            Some(lim) => lim
346                .apply_to_command(&mut cmd)
347                .map_err(|e| SpawnError::Process(format!("Failed to apply process limits: {e}")))?,
348            None => PostSpawnAction::default(),
349        };
350
351        let child = cmd
352            .spawn()
353            .map_err(|e| SpawnError::Process(e.to_string()))?;
354        Ok(StdioChild::from_local_tokio_child(child, post_spawn))
355    }
356
357    async fn command_exists(&self, command: &str) -> bool {
358        which::which(command).is_ok()
359    }
360}
361
362#[cfg(test)]
363mod tests {
364    use super::*;
365    use tokio::io::AsyncReadExt;
366
367    #[tokio::test]
368    async fn test_local_spawner() {
369        let spawner = LocalProcessSpawner;
370        let result = spawner
371            .spawn("echo".to_string(), vec!["hello".to_string()], None)
372            .await
373            .unwrap();
374
375        assert_eq!(result.exit_code, 0);
376        assert!(result.stdout.trim() == "hello");
377    }
378
379    #[tokio::test]
380    async fn local_long_running_spawn_stdio_pipes_output() {
381        let spawner = LocalLongRunningSpawner;
382        let mut child = spawner
383            .spawn_stdio(
384                "sh",
385                &["-c".into(), "echo hi".into()],
386                Vec::new(),
387                None,
388                None,
389            )
390            .await
391            .expect("spawn succeeds");
392
393        let mut stdout = child.take_stdout().expect("stdout piped");
394        let mut buf = String::new();
395        stdout.read_to_string(&mut buf).await.unwrap();
396        assert_eq!(buf.trim(), "hi");
397
398        let status = child.wait().await.unwrap();
399        assert!(status.success());
400        assert!(child.spawned_locally());
401    }
402
403    #[tokio::test]
404    async fn local_long_running_command_exists_for_sh() {
405        let spawner = LocalLongRunningSpawner;
406        assert!(spawner.command_exists("sh").await);
407        assert!(
408            !spawner
409                .command_exists("fresh-unlikely-binary-name-ygzu9")
410                .await
411        );
412    }
413}