fresh/services/remote/spawner.rs
1//! Process spawner abstraction
2//!
3//! Provides a trait for spawning processes that works transparently on both
4//! local and remote hosts. Used by the Editor's SpawnProcess handler (for
5//! plugins like git_grep) and by FileProvider (for `git ls-files`).
6//!
7//! Two orthogonal traits live here:
8//!
9//! - [`ProcessSpawner`] — one-shot "run and collect" commands. Callers get
10//! `{stdout, stderr, exit_code}` back once the child exits. Used by
11//! plugin `spawnProcess`, find-in-files, `git ls-files`, etc.
12//! - [`LongRunningSpawner`] — long-lived stdio processes (LSP servers,
13//! future tool agents). Callers get a [`StdioChild`] they can talk to
14//! via piped stdin/stdout/stderr and kill explicitly. LSP servers route
15//! through this so an authority pointing at a container runs the server
16//! inside the container (via `docker exec -i`) instead of on the host.
17
18use crate::services::process_limits::PostSpawnAction;
19use crate::services::remote::channel::{AgentChannel, ChannelError};
20use crate::services::remote::protocol::{decode_base64, exec_params};
21use crate::types::ProcessLimits;
22use std::path::Path;
23use std::process::ExitStatus;
24use std::sync::Arc;
25use tokio::process::{ChildStderr, ChildStdin, ChildStdout};
26
27/// Result of spawning a process
28#[derive(Debug, Clone)]
29pub struct SpawnResult {
30 pub stdout: String,
31 pub stderr: String,
32 pub exit_code: i32,
33}
34
35/// Error from spawning a process
36#[derive(Debug, thiserror::Error)]
37pub enum SpawnError {
38 #[error("Channel error: {0}")]
39 Channel(#[from] ChannelError),
40
41 #[error("Process error: {0}")]
42 Process(String),
43
44 #[error("Decode error: {0}")]
45 Decode(String),
46}
47
48/// Trait for spawning processes (local or remote)
49///
50/// This abstraction allows plugins and core features (like file discovery)
51/// to spawn processes transparently on either local or remote filesystems.
52#[async_trait::async_trait]
53pub trait ProcessSpawner: Send + Sync {
54 /// Spawn a process and wait for completion
55 async fn spawn(
56 &self,
57 command: String,
58 args: Vec<String>,
59 cwd: Option<String>,
60 ) -> Result<SpawnResult, SpawnError>;
61}
62
63/// Local process spawner using tokio
64///
65/// Used for local file editing (the default).
66pub struct LocalProcessSpawner;
67
68#[async_trait::async_trait]
69impl ProcessSpawner for LocalProcessSpawner {
70 async fn spawn(
71 &self,
72 command: String,
73 args: Vec<String>,
74 cwd: Option<String>,
75 ) -> Result<SpawnResult, SpawnError> {
76 let mut cmd = tokio::process::Command::new(&command);
77 cmd.args(&args);
78
79 if let Some(ref dir) = cwd {
80 cmd.current_dir(dir);
81 }
82
83 let output = cmd
84 .output()
85 .await
86 .map_err(|e| SpawnError::Process(e.to_string()))?;
87
88 Ok(SpawnResult {
89 stdout: String::from_utf8_lossy(&output.stdout).to_string(),
90 stderr: String::from_utf8_lossy(&output.stderr).to_string(),
91 exit_code: output.status.code().unwrap_or(-1),
92 })
93 }
94}
95
96/// Remote process spawner via SSH agent
97pub struct RemoteProcessSpawner {
98 channel: Arc<AgentChannel>,
99}
100
101impl RemoteProcessSpawner {
102 /// Create a new remote process spawner
103 pub fn new(channel: Arc<AgentChannel>) -> Self {
104 Self { channel }
105 }
106}
107
108#[async_trait::async_trait]
109impl ProcessSpawner for RemoteProcessSpawner {
110 async fn spawn(
111 &self,
112 command: String,
113 args: Vec<String>,
114 cwd: Option<String>,
115 ) -> Result<SpawnResult, SpawnError> {
116 let params = exec_params(&command, &args, cwd.as_deref());
117
118 // Use streaming request to get live output
119 let (mut data_rx, result_rx) = self.channel.request_streaming("exec", params).await?;
120
121 let mut stdout = Vec::new();
122 let mut stderr = Vec::new();
123
124 // Collect streaming output
125 while let Some(data) = data_rx.recv().await {
126 if let Some(out) = data.get("out").and_then(|v| v.as_str()) {
127 if let Ok(decoded) = decode_base64(out) {
128 stdout.extend_from_slice(&decoded);
129 }
130 }
131 if let Some(err) = data.get("err").and_then(|v| v.as_str()) {
132 if let Ok(decoded) = decode_base64(err) {
133 stderr.extend_from_slice(&decoded);
134 }
135 }
136 }
137
138 // Get final result
139 let result = result_rx
140 .await
141 .map_err(|_| SpawnError::Channel(ChannelError::ChannelClosed))?
142 .map_err(SpawnError::Process)?;
143
144 let exit_code = result
145 .get("code")
146 .and_then(|v| v.as_i64())
147 .map(|c| c as i32)
148 .unwrap_or(-1);
149
150 Ok(SpawnResult {
151 stdout: String::from_utf8_lossy(&stdout).to_string(),
152 stderr: String::from_utf8_lossy(&stderr).to_string(),
153 exit_code,
154 })
155 }
156}
157
158/// A long-lived child process with piped stdio streams.
159///
160/// Wraps [`tokio::process::Child`] so the LSP code (and future callers
161/// like plugin-managed tool agents) doesn't reach into concrete process
162/// types — that way a container authority can transparently run the
163/// child through `docker exec -i` while the caller keeps talking to an
164/// ordinary stdin/stdout pair.
165///
166/// Streams are `Option`-wrapped so callers can [`Self::take_stdin`] /
167/// [`Self::take_stdout`] / [`Self::take_stderr`] into their own reader
168/// and writer tasks. After all streams are taken, the `StdioChild` is
169/// still useful for lifecycle control via [`Self::kill`] and
170/// [`Self::wait`].
171///
172/// `spawned_locally` tells callers whether `id()` names the real child
173/// process (true for local spawns) or an intermediate like `docker` /
174/// `ssh` (false). LSP's cgroup-attachment step keys off this — applying
175/// a cgroup to the `docker` CLI PID doesn't constrain the container-
176/// side server it exec'd.
177pub struct StdioChild {
178 inner: tokio::process::Child,
179 stdin: Option<ChildStdin>,
180 stdout: Option<ChildStdout>,
181 stderr: Option<ChildStderr>,
182 spawned_locally: bool,
183}
184
185impl StdioChild {
186 /// Construct a `StdioChild` from an already-spawned
187 /// `tokio::process::Child`. Pulls the piped streams out of the
188 /// child so callers can take them individually later.
189 ///
190 /// This constructor is for spawners that don't participate in
191 /// host-side resource limiting (the Docker variant is the
192 /// canonical example). Local spawners should prefer
193 /// [`Self::from_local_tokio_child`] so a `PostSpawnAction` produced
194 /// by [`ProcessLimits::apply_to_command`] is applied to the child's
195 /// PID before the spawner returns.
196 pub fn from_tokio_child(mut child: tokio::process::Child, spawned_locally: bool) -> Self {
197 let stdin = child.stdin.take();
198 let stdout = child.stdout.take();
199 let stderr = child.stderr.take();
200 Self {
201 inner: child,
202 stdin,
203 stdout,
204 stderr,
205 spawned_locally,
206 }
207 }
208
209 /// Construct a `StdioChild` for a locally-spawned child while
210 /// applying any host-side `PostSpawnAction` (cgroup attachment)
211 /// returned by [`ProcessLimits::apply_to_command`]. Best-effort:
212 /// failure to attach logs a warning but doesn't fail the spawn,
213 /// matching the pre-refactor behavior.
214 pub fn from_local_tokio_child(
215 child: tokio::process::Child,
216 post_spawn: PostSpawnAction,
217 ) -> Self {
218 let out = Self::from_tokio_child(child, true);
219 if let Some(pid) = out.inner.id() {
220 post_spawn.apply_to_child(pid);
221 }
222 out
223 }
224
225 /// Take the stdin stream. Returns `None` after the first call.
226 pub fn take_stdin(&mut self) -> Option<ChildStdin> {
227 self.stdin.take()
228 }
229
230 /// Take the stdout stream. Returns `None` after the first call.
231 pub fn take_stdout(&mut self) -> Option<ChildStdout> {
232 self.stdout.take()
233 }
234
235 /// Take the stderr stream. Returns `None` after the first call.
236 pub fn take_stderr(&mut self) -> Option<ChildStderr> {
237 self.stderr.take()
238 }
239
240 /// PID of the immediate child process. For local spawns this is
241 /// the LSP server itself; for docker/ssh this is the CLI wrapper.
242 /// Use [`Self::spawned_locally`] to tell which.
243 pub fn id(&self) -> Option<u32> {
244 self.inner.id()
245 }
246
247 /// `true` when the child PID names the real target process. Callers
248 /// that only apply host-side resource controls (cgroups, rlimits)
249 /// should skip their application when this is `false`.
250 pub fn spawned_locally(&self) -> bool {
251 self.spawned_locally
252 }
253
254 /// Request termination. Forwards to [`tokio::process::Child::kill`].
255 pub async fn kill(&mut self) -> std::io::Result<()> {
256 self.inner.kill().await
257 }
258
259 /// Await exit. Forwards to [`tokio::process::Child::wait`].
260 pub async fn wait(&mut self) -> std::io::Result<ExitStatus> {
261 self.inner.wait().await
262 }
263}
264
265/// Spawner for long-lived stdio processes (LSP servers, tool agents).
266///
267/// Separate from [`ProcessSpawner`] because the APIs diverge in two
268/// ways that don't compose: [`ProcessSpawner::spawn`] awaits
269/// completion and returns collected output; callers of
270/// `LongRunningSpawner` need a live child they can read from and
271/// write to over time.
272///
273/// Authorities expose one of these alongside their filesystem and
274/// one-shot spawner. Routing LSP spawning through it is what gives
275/// container authorities in-container LSP without a special-cased
276/// branch in `LspHandle`.
277///
278/// Callers pass an optional [`ProcessLimits`] block so local spawners
279/// can honor host-side memory / CPU limits. Non-local variants (docker,
280/// ssh) don't have a meaningful way to impose host limits on their
281/// child — cgroups attached to the `docker` CLI PID don't reach into
282/// the container — and are expected to ignore them.
283#[async_trait::async_trait]
284pub trait LongRunningSpawner: Send + Sync {
285 /// Spawn `command` with `args` as a long-lived stdio child under
286 /// this authority. Stdin/stdout/stderr are piped so the caller can
287 /// hand them to dedicated reader/writer tasks. `limits`, when
288 /// provided, lets local spawners attach cgroups or `setrlimit`;
289 /// remote spawners are expected to ignore it (see trait docs).
290 async fn spawn_stdio(
291 &self,
292 command: &str,
293 args: &[String],
294 env: Vec<(String, String)>,
295 cwd: Option<&Path>,
296 limits: Option<&ProcessLimits>,
297 ) -> Result<StdioChild, SpawnError>;
298
299 /// Check whether `command` resolves to an executable under this
300 /// authority. Routed through the same spawner so an SSH authority
301 /// probes the remote `$PATH` and a container authority probes the
302 /// container's `$PATH` — unlike `which::which` which only ever sees
303 /// the host.
304 async fn command_exists(&self, command: &str) -> bool;
305}
306
307/// Local long-running spawner using `tokio::process::Command` directly.
308///
309/// Functionally equivalent to how `LspHandle::spawn` works today, but
310/// exposed through the trait so non-local authorities can substitute
311/// their own implementation without any LSP-side awareness. Applies
312/// any `ProcessLimits` passed in via the same machinery the
313/// pre-refactor LSP code used (`apply_to_command` + `apply_to_child`).
314pub struct LocalLongRunningSpawner;
315
316#[async_trait::async_trait]
317impl LongRunningSpawner for LocalLongRunningSpawner {
318 async fn spawn_stdio(
319 &self,
320 command: &str,
321 args: &[String],
322 env: Vec<(String, String)>,
323 cwd: Option<&Path>,
324 limits: Option<&ProcessLimits>,
325 ) -> Result<StdioChild, SpawnError> {
326 let mut cmd = tokio::process::Command::new(command);
327 cmd.args(args)
328 .envs(env)
329 .stdin(std::process::Stdio::piped())
330 .stdout(std::process::Stdio::piped())
331 .stderr(std::process::Stdio::piped())
332 .kill_on_drop(true);
333 if let Some(dir) = cwd {
334 cmd.current_dir(dir);
335 }
336
337 // Apply pre-spawn hooks (cgroup path selection, setrlimit
338 // via `pre_exec`). Errors bubble up so callers see
339 // configuration problems early — matches the pre-refactor
340 // LSP behavior.
341 let post_spawn = match limits {
342 Some(lim) => lim
343 .apply_to_command(&mut cmd)
344 .map_err(|e| SpawnError::Process(format!("Failed to apply process limits: {e}")))?,
345 None => PostSpawnAction::default(),
346 };
347
348 let child = cmd
349 .spawn()
350 .map_err(|e| SpawnError::Process(e.to_string()))?;
351 Ok(StdioChild::from_local_tokio_child(child, post_spawn))
352 }
353
354 async fn command_exists(&self, command: &str) -> bool {
355 which::which(command).is_ok()
356 }
357}
358
359#[cfg(test)]
360mod tests {
361 use super::*;
362 use tokio::io::AsyncReadExt;
363
364 #[tokio::test]
365 async fn test_local_spawner() {
366 let spawner = LocalProcessSpawner;
367 let result = spawner
368 .spawn("echo".to_string(), vec!["hello".to_string()], None)
369 .await
370 .unwrap();
371
372 assert_eq!(result.exit_code, 0);
373 assert!(result.stdout.trim() == "hello");
374 }
375
376 #[tokio::test]
377 async fn local_long_running_spawn_stdio_pipes_output() {
378 let spawner = LocalLongRunningSpawner;
379 let mut child = spawner
380 .spawn_stdio(
381 "sh",
382 &["-c".into(), "echo hi".into()],
383 Vec::new(),
384 None,
385 None,
386 )
387 .await
388 .expect("spawn succeeds");
389
390 let mut stdout = child.take_stdout().expect("stdout piped");
391 let mut buf = String::new();
392 stdout.read_to_string(&mut buf).await.unwrap();
393 assert_eq!(buf.trim(), "hi");
394
395 let status = child.wait().await.unwrap();
396 assert!(status.success());
397 assert!(child.spawned_locally());
398 }
399
400 #[tokio::test]
401 async fn local_long_running_command_exists_for_sh() {
402 let spawner = LocalLongRunningSpawner;
403 assert!(spawner.command_exists("sh").await);
404 assert!(
405 !spawner
406 .command_exists("fresh-unlikely-binary-name-ygzu9")
407 .await
408 );
409 }
410}