fresh/services/remote/spawner.rs
1//! Process spawner abstraction
2//!
3//! Provides a trait for spawning processes that works transparently on both
4//! local and remote hosts. Used by the Editor's SpawnProcess handler (for
5//! plugins like git_grep) and by FileProvider (for `git ls-files`).
6//!
7//! Two orthogonal traits live here:
8//!
9//! - [`ProcessSpawner`] — one-shot "run and collect" commands. Callers get
10//! `{stdout, stderr, exit_code}` back once the child exits. Used by
11//! plugin `spawnProcess`, find-in-files, `git ls-files`, etc.
12//! - [`LongRunningSpawner`] — long-lived stdio processes (LSP servers,
13//! future tool agents). Callers get a [`StdioChild`] they can talk to
14//! via piped stdin/stdout/stderr and kill explicitly. LSP servers route
15//! through this so an authority pointing at a container runs the server
16//! inside the container (via `docker exec -i`) instead of on the host.
17
18use crate::services::process_hidden::HideWindow;
19use crate::services::process_limits::PostSpawnAction;
20use crate::services::remote::channel::{AgentChannel, ChannelError};
21use crate::services::remote::protocol::{decode_base64, exec_params};
22use crate::types::ProcessLimits;
23use std::path::Path;
24use std::process::ExitStatus;
25use std::sync::Arc;
26use tokio::process::{ChildStderr, ChildStdin, ChildStdout};
27
28/// Result of spawning a process
29#[derive(Debug, Clone)]
30pub struct SpawnResult {
31 pub stdout: String,
32 pub stderr: String,
33 pub exit_code: i32,
34}
35
36/// Error from spawning a process
37#[derive(Debug, thiserror::Error)]
38pub enum SpawnError {
39 #[error("Channel error: {0}")]
40 Channel(#[from] ChannelError),
41
42 #[error("Process error: {0}")]
43 Process(String),
44
45 #[error("Decode error: {0}")]
46 Decode(String),
47}
48
49/// Trait for spawning processes (local or remote)
50///
51/// This abstraction allows plugins and core features (like file discovery)
52/// to spawn processes transparently on either local or remote filesystems.
53#[async_trait::async_trait]
54pub trait ProcessSpawner: Send + Sync {
55 /// Spawn a process and wait for completion
56 async fn spawn(
57 &self,
58 command: String,
59 args: Vec<String>,
60 cwd: Option<String>,
61 ) -> Result<SpawnResult, SpawnError>;
62}
63
64/// Local process spawner using tokio
65///
66/// Used for local file editing (the default).
67pub struct LocalProcessSpawner;
68
69#[async_trait::async_trait]
70impl ProcessSpawner for LocalProcessSpawner {
71 async fn spawn(
72 &self,
73 command: String,
74 args: Vec<String>,
75 cwd: Option<String>,
76 ) -> Result<SpawnResult, SpawnError> {
77 let mut cmd = tokio::process::Command::new(&command);
78 cmd.args(&args);
79 cmd.hide_window();
80
81 if let Some(ref dir) = cwd {
82 cmd.current_dir(dir);
83 }
84
85 let output = cmd
86 .output()
87 .await
88 .map_err(|e| SpawnError::Process(e.to_string()))?;
89
90 Ok(SpawnResult {
91 stdout: String::from_utf8_lossy(&output.stdout).to_string(),
92 stderr: String::from_utf8_lossy(&output.stderr).to_string(),
93 exit_code: output.status.code().unwrap_or(-1),
94 })
95 }
96}
97
98/// Remote process spawner via SSH agent
99pub struct RemoteProcessSpawner {
100 channel: Arc<AgentChannel>,
101}
102
103impl RemoteProcessSpawner {
104 /// Create a new remote process spawner
105 pub fn new(channel: Arc<AgentChannel>) -> Self {
106 Self { channel }
107 }
108}
109
110#[async_trait::async_trait]
111impl ProcessSpawner for RemoteProcessSpawner {
112 async fn spawn(
113 &self,
114 command: String,
115 args: Vec<String>,
116 cwd: Option<String>,
117 ) -> Result<SpawnResult, SpawnError> {
118 let params = exec_params(&command, &args, cwd.as_deref());
119
120 // Use streaming request to get live output
121 let (mut data_rx, result_rx) = self.channel.request_streaming("exec", params).await?;
122
123 let mut stdout = Vec::new();
124 let mut stderr = Vec::new();
125
126 // Collect streaming output
127 while let Some(data) = data_rx.recv().await {
128 if let Some(out) = data.get("out").and_then(|v| v.as_str()) {
129 if let Ok(decoded) = decode_base64(out) {
130 stdout.extend_from_slice(&decoded);
131 }
132 }
133 if let Some(err) = data.get("err").and_then(|v| v.as_str()) {
134 if let Ok(decoded) = decode_base64(err) {
135 stderr.extend_from_slice(&decoded);
136 }
137 }
138 }
139
140 // Get final result
141 let result = result_rx
142 .await
143 .map_err(|_| SpawnError::Channel(ChannelError::ChannelClosed))?
144 .map_err(SpawnError::Process)?;
145
146 let exit_code = result
147 .get("code")
148 .and_then(|v| v.as_i64())
149 .map(|c| c as i32)
150 .unwrap_or(-1);
151
152 Ok(SpawnResult {
153 stdout: String::from_utf8_lossy(&stdout).to_string(),
154 stderr: String::from_utf8_lossy(&stderr).to_string(),
155 exit_code,
156 })
157 }
158}
159
160/// A long-lived child process with piped stdio streams.
161///
162/// Wraps [`tokio::process::Child`] so the LSP code (and future callers
163/// like plugin-managed tool agents) doesn't reach into concrete process
164/// types — that way a container authority can transparently run the
165/// child through `docker exec -i` while the caller keeps talking to an
166/// ordinary stdin/stdout pair.
167///
168/// Streams are `Option`-wrapped so callers can [`Self::take_stdin`] /
169/// [`Self::take_stdout`] / [`Self::take_stderr`] into their own reader
170/// and writer tasks. After all streams are taken, the `StdioChild` is
171/// still useful for lifecycle control via [`Self::kill`] and
172/// [`Self::wait`].
173///
174/// `spawned_locally` tells callers whether `id()` names the real child
175/// process (true for local spawns) or an intermediate like `docker` /
176/// `ssh` (false). LSP's cgroup-attachment step keys off this — applying
177/// a cgroup to the `docker` CLI PID doesn't constrain the container-
178/// side server it exec'd.
179pub struct StdioChild {
180 inner: tokio::process::Child,
181 stdin: Option<ChildStdin>,
182 stdout: Option<ChildStdout>,
183 stderr: Option<ChildStderr>,
184 spawned_locally: bool,
185}
186
187impl StdioChild {
188 /// Construct a `StdioChild` from an already-spawned
189 /// `tokio::process::Child`. Pulls the piped streams out of the
190 /// child so callers can take them individually later.
191 ///
192 /// This constructor is for spawners that don't participate in
193 /// host-side resource limiting (the Docker variant is the
194 /// canonical example). Local spawners should prefer
195 /// [`Self::from_local_tokio_child`] so a `PostSpawnAction` produced
196 /// by [`ProcessLimits::apply_to_command`] is applied to the child's
197 /// PID before the spawner returns.
198 pub fn from_tokio_child(mut child: tokio::process::Child, spawned_locally: bool) -> Self {
199 let stdin = child.stdin.take();
200 let stdout = child.stdout.take();
201 let stderr = child.stderr.take();
202 Self {
203 inner: child,
204 stdin,
205 stdout,
206 stderr,
207 spawned_locally,
208 }
209 }
210
211 /// Construct a `StdioChild` for a locally-spawned child while
212 /// applying any host-side `PostSpawnAction` (cgroup attachment)
213 /// returned by [`ProcessLimits::apply_to_command`]. Best-effort:
214 /// failure to attach logs a warning but doesn't fail the spawn,
215 /// matching the pre-refactor behavior.
216 pub fn from_local_tokio_child(
217 child: tokio::process::Child,
218 post_spawn: PostSpawnAction,
219 ) -> Self {
220 let out = Self::from_tokio_child(child, true);
221 if let Some(pid) = out.inner.id() {
222 post_spawn.apply_to_child(pid);
223 }
224 out
225 }
226
227 /// Take the stdin stream. Returns `None` after the first call.
228 pub fn take_stdin(&mut self) -> Option<ChildStdin> {
229 self.stdin.take()
230 }
231
232 /// Take the stdout stream. Returns `None` after the first call.
233 pub fn take_stdout(&mut self) -> Option<ChildStdout> {
234 self.stdout.take()
235 }
236
237 /// Take the stderr stream. Returns `None` after the first call.
238 pub fn take_stderr(&mut self) -> Option<ChildStderr> {
239 self.stderr.take()
240 }
241
242 /// PID of the immediate child process. For local spawns this is
243 /// the LSP server itself; for docker/ssh this is the CLI wrapper.
244 /// Use [`Self::spawned_locally`] to tell which.
245 pub fn id(&self) -> Option<u32> {
246 self.inner.id()
247 }
248
249 /// `true` when the child PID names the real target process. Callers
250 /// that only apply host-side resource controls (cgroups, rlimits)
251 /// should skip their application when this is `false`.
252 pub fn spawned_locally(&self) -> bool {
253 self.spawned_locally
254 }
255
256 /// Request termination. Forwards to [`tokio::process::Child::kill`].
257 pub async fn kill(&mut self) -> std::io::Result<()> {
258 self.inner.kill().await
259 }
260
261 /// Await exit. Forwards to [`tokio::process::Child::wait`].
262 pub async fn wait(&mut self) -> std::io::Result<ExitStatus> {
263 self.inner.wait().await
264 }
265}
266
267/// Spawner for long-lived stdio processes (LSP servers, tool agents).
268///
269/// Separate from [`ProcessSpawner`] because the APIs diverge in two
270/// ways that don't compose: [`ProcessSpawner::spawn`] awaits
271/// completion and returns collected output; callers of
272/// `LongRunningSpawner` need a live child they can read from and
273/// write to over time.
274///
275/// Authorities expose one of these alongside their filesystem and
276/// one-shot spawner. Routing LSP spawning through it is what gives
277/// container authorities in-container LSP without a special-cased
278/// branch in `LspHandle`.
279///
280/// Callers pass an optional [`ProcessLimits`] block so local spawners
281/// can honor host-side memory / CPU limits. Non-local variants (docker,
282/// ssh) don't have a meaningful way to impose host limits on their
283/// child — cgroups attached to the `docker` CLI PID don't reach into
284/// the container — and are expected to ignore them.
285#[async_trait::async_trait]
286pub trait LongRunningSpawner: Send + Sync {
287 /// Spawn `command` with `args` as a long-lived stdio child under
288 /// this authority. Stdin/stdout/stderr are piped so the caller can
289 /// hand them to dedicated reader/writer tasks. `limits`, when
290 /// provided, lets local spawners attach cgroups or `setrlimit`;
291 /// remote spawners are expected to ignore it (see trait docs).
292 async fn spawn_stdio(
293 &self,
294 command: &str,
295 args: &[String],
296 env: Vec<(String, String)>,
297 cwd: Option<&Path>,
298 limits: Option<&ProcessLimits>,
299 ) -> Result<StdioChild, SpawnError>;
300
301 /// Check whether `command` resolves to an executable under this
302 /// authority. Routed through the same spawner so an SSH authority
303 /// probes the remote `$PATH` and a container authority probes the
304 /// container's `$PATH` — unlike `which::which` which only ever sees
305 /// the host.
306 async fn command_exists(&self, command: &str) -> bool;
307}
308
309/// Local long-running spawner using `tokio::process::Command` directly.
310///
311/// Functionally equivalent to how `LspHandle::spawn` works today, but
312/// exposed through the trait so non-local authorities can substitute
313/// their own implementation without any LSP-side awareness. Applies
314/// any `ProcessLimits` passed in via the same machinery the
315/// pre-refactor LSP code used (`apply_to_command` + `apply_to_child`).
316pub struct LocalLongRunningSpawner;
317
318#[async_trait::async_trait]
319impl LongRunningSpawner for LocalLongRunningSpawner {
320 async fn spawn_stdio(
321 &self,
322 command: &str,
323 args: &[String],
324 env: Vec<(String, String)>,
325 cwd: Option<&Path>,
326 limits: Option<&ProcessLimits>,
327 ) -> Result<StdioChild, SpawnError> {
328 let mut cmd = tokio::process::Command::new(command);
329 cmd.args(args)
330 .envs(env)
331 .stdin(std::process::Stdio::piped())
332 .stdout(std::process::Stdio::piped())
333 .stderr(std::process::Stdio::piped())
334 .hide_window()
335 .kill_on_drop(true);
336 if let Some(dir) = cwd {
337 cmd.current_dir(dir);
338 }
339
340 // Apply pre-spawn hooks (cgroup path selection, setrlimit
341 // via `pre_exec`). Errors bubble up so callers see
342 // configuration problems early — matches the pre-refactor
343 // LSP behavior.
344 let post_spawn = match limits {
345 Some(lim) => lim
346 .apply_to_command(&mut cmd)
347 .map_err(|e| SpawnError::Process(format!("Failed to apply process limits: {e}")))?,
348 None => PostSpawnAction::default(),
349 };
350
351 let child = cmd
352 .spawn()
353 .map_err(|e| SpawnError::Process(e.to_string()))?;
354 Ok(StdioChild::from_local_tokio_child(child, post_spawn))
355 }
356
357 async fn command_exists(&self, command: &str) -> bool {
358 which::which(command).is_ok()
359 }
360}
361
362#[cfg(test)]
363mod tests {
364 use super::*;
365 use tokio::io::AsyncReadExt;
366
367 #[tokio::test]
368 async fn test_local_spawner() {
369 let spawner = LocalProcessSpawner;
370 let result = spawner
371 .spawn("echo".to_string(), vec!["hello".to_string()], None)
372 .await
373 .unwrap();
374
375 assert_eq!(result.exit_code, 0);
376 assert!(result.stdout.trim() == "hello");
377 }
378
379 #[tokio::test]
380 async fn local_long_running_spawn_stdio_pipes_output() {
381 let spawner = LocalLongRunningSpawner;
382 let mut child = spawner
383 .spawn_stdio(
384 "sh",
385 &["-c".into(), "echo hi".into()],
386 Vec::new(),
387 None,
388 None,
389 )
390 .await
391 .expect("spawn succeeds");
392
393 let mut stdout = child.take_stdout().expect("stdout piped");
394 let mut buf = String::new();
395 stdout.read_to_string(&mut buf).await.unwrap();
396 assert_eq!(buf.trim(), "hi");
397
398 let status = child.wait().await.unwrap();
399 assert!(status.success());
400 assert!(child.spawned_locally());
401 }
402
403 #[tokio::test]
404 async fn local_long_running_command_exists_for_sh() {
405 let spawner = LocalLongRunningSpawner;
406 assert!(spawner.command_exists("sh").await);
407 assert!(
408 !spawner
409 .command_exists("fresh-unlikely-binary-name-ygzu9")
410 .await
411 );
412 }
413}