1use crate::services::process_hidden::HideWindow;
19use crate::services::process_limits::PostSpawnAction;
20use crate::services::remote::channel::{AgentChannel, ChannelError};
21use crate::services::remote::protocol::{decode_base64, exec_params};
22use crate::types::ProcessLimits;
23use std::path::Path;
24use std::process::ExitStatus;
25use std::sync::Arc;
26use tokio::process::{ChildStderr, ChildStdin, ChildStdout};
27
28#[derive(Debug, Clone)]
30pub struct SpawnResult {
31 pub stdout: String,
32 pub stderr: String,
33 pub exit_code: i32,
34}
35
36#[derive(Debug, thiserror::Error)]
38pub enum SpawnError {
39 #[error("Channel error: {0}")]
40 Channel(#[from] ChannelError),
41
42 #[error("Process error: {0}")]
43 Process(String),
44
45 #[error("Decode error: {0}")]
46 Decode(String),
47}
48
49#[async_trait::async_trait]
54pub trait ProcessSpawner: Send + Sync {
55 async fn spawn(
57 &self,
58 command: String,
59 args: Vec<String>,
60 cwd: Option<String>,
61 ) -> Result<SpawnResult, SpawnError>;
62
63 async fn spawn_to_file(
70 &self,
71 command: String,
72 args: Vec<String>,
73 cwd: Option<String>,
74 stdout_to: std::path::PathBuf,
75 ) -> Result<SpawnResult, SpawnError> {
76 let result = self.spawn(command, args, cwd).await?;
79 if result.exit_code == 0 || !result.stdout.is_empty() {
80 std::fs::write(&stdout_to, result.stdout.as_bytes())
81 .map_err(|e| SpawnError::Process(format!("write {:?}: {}", stdout_to, e)))?;
82 }
83 Ok(SpawnResult {
84 stdout: String::new(),
85 stderr: result.stderr,
86 exit_code: result.exit_code,
87 })
88 }
89
90 async fn spawn_cancellable(
100 &self,
101 command: String,
102 args: Vec<String>,
103 cwd: Option<String>,
104 stdout_to: Option<std::path::PathBuf>,
105 _kill_rx: tokio::sync::oneshot::Receiver<()>,
106 ) -> Result<SpawnResult, SpawnError> {
107 match stdout_to {
108 Some(p) => self.spawn_to_file(command, args, cwd, p).await,
109 None => self.spawn(command, args, cwd).await,
110 }
111 }
112}
113
114pub struct LocalProcessSpawner;
118
119#[async_trait::async_trait]
120impl ProcessSpawner for LocalProcessSpawner {
121 async fn spawn(
122 &self,
123 command: String,
124 args: Vec<String>,
125 cwd: Option<String>,
126 ) -> Result<SpawnResult, SpawnError> {
127 let mut cmd = tokio::process::Command::new(&command);
128 cmd.args(&args);
129 cmd.hide_window();
130
131 if let Some(ref dir) = cwd {
132 cmd.current_dir(dir);
133 }
134
135 let output = cmd
136 .output()
137 .await
138 .map_err(|e| SpawnError::Process(e.to_string()))?;
139
140 Ok(SpawnResult {
141 stdout: String::from_utf8_lossy(&output.stdout).to_string(),
142 stderr: String::from_utf8_lossy(&output.stderr).to_string(),
143 exit_code: output.status.code().unwrap_or(-1),
144 })
145 }
146
147 async fn spawn_cancellable(
151 &self,
152 command: String,
153 args: Vec<String>,
154 cwd: Option<String>,
155 stdout_to: Option<std::path::PathBuf>,
156 kill_rx: tokio::sync::oneshot::Receiver<()>,
157 ) -> Result<SpawnResult, SpawnError> {
158 use std::process::Stdio;
159 use tokio::io::AsyncReadExt;
160
161 let mut cmd = tokio::process::Command::new(&command);
162 cmd.args(&args);
163 cmd.hide_window();
164 cmd.stdout(Stdio::piped());
165 cmd.stderr(Stdio::piped());
166 if let Some(ref dir) = cwd {
167 cmd.current_dir(dir);
168 }
169
170 if let Some(ref path) = stdout_to {
175 if let Some(parent) = path.parent() {
176 if !parent.as_os_str().is_empty() {
177 tokio::fs::create_dir_all(parent).await.map_err(|e| {
178 SpawnError::Process(format!("create_dir_all {:?}: {}", parent, e))
179 })?;
180 }
181 }
182 }
183
184 let mut child = cmd
185 .spawn()
186 .map_err(|e| SpawnError::Process(e.to_string()))?;
187
188 let mut child_stdout = child
189 .stdout
190 .take()
191 .ok_or_else(|| SpawnError::Process("child stdout missing".to_string()))?;
192 let mut child_stderr = child
193 .stderr
194 .take()
195 .ok_or_else(|| SpawnError::Process("child stderr missing".to_string()))?;
196
197 let stdout_task: tokio::task::JoinHandle<std::io::Result<Vec<u8>>> = match stdout_to {
200 Some(path) => tokio::spawn(async move {
201 let mut file = tokio::fs::File::create(&path).await?;
202 tokio::io::copy(&mut child_stdout, &mut file).await?;
203 use tokio::io::AsyncWriteExt;
204 if let Err(e) = file.flush().await {
209 tracing::warn!("spawn_cancellable: file flush failed: {}", e);
210 }
211 if let Err(e) = file.sync_all().await {
212 tracing::warn!("spawn_cancellable: file sync_all failed: {}", e);
213 }
214 Ok(Vec::new())
215 }),
216 None => tokio::spawn(async move {
217 let mut buf = Vec::new();
218 child_stdout.read_to_end(&mut buf).await?;
219 Ok(buf)
220 }),
221 };
222 let stderr_task: tokio::task::JoinHandle<std::io::Result<Vec<u8>>> =
223 tokio::spawn(async move {
224 let mut buf = Vec::new();
225 child_stderr.read_to_end(&mut buf).await?;
226 Ok(buf)
227 });
228
229 let exit_code = tokio::select! {
233 status = child.wait() => status
234 .map(|s| s.code().unwrap_or(-1))
235 .unwrap_or(-1),
236 _ = kill_rx => {
237 if let Err(e) = child.start_kill() {
242 tracing::debug!("spawn_cancellable: start_kill (already exited?): {}", e);
243 }
244 child.wait().await.map(|s| s.code().unwrap_or(-1)).unwrap_or(-1)
245 }
246 };
247
248 let stdout_bytes = stdout_task
251 .await
252 .map_err(|e| SpawnError::Process(format!("stdout task: {}", e)))?
253 .map_err(|e| SpawnError::Process(format!("stdout drain: {}", e)))?;
254 let stderr_bytes = stderr_task
255 .await
256 .map_err(|e| SpawnError::Process(format!("stderr task: {}", e)))?
257 .map_err(|e| SpawnError::Process(format!("stderr drain: {}", e)))?;
258
259 Ok(SpawnResult {
260 stdout: String::from_utf8_lossy(&stdout_bytes).to_string(),
261 stderr: String::from_utf8_lossy(&stderr_bytes).to_string(),
262 exit_code,
263 })
264 }
265
266 async fn spawn_to_file(
270 &self,
271 command: String,
272 args: Vec<String>,
273 cwd: Option<String>,
274 stdout_to: std::path::PathBuf,
275 ) -> Result<SpawnResult, SpawnError> {
276 use std::process::Stdio;
277 use tokio::io::AsyncWriteExt;
278
279 let mut cmd = tokio::process::Command::new(&command);
280 cmd.args(&args);
281 cmd.hide_window();
282 cmd.stdout(Stdio::piped());
283 cmd.stderr(Stdio::piped());
284 if let Some(ref dir) = cwd {
285 cmd.current_dir(dir);
286 }
287
288 if let Some(parent) = stdout_to.parent() {
292 if !parent.as_os_str().is_empty() {
293 tokio::fs::create_dir_all(parent).await.map_err(|e| {
294 SpawnError::Process(format!("create_dir_all {:?}: {}", parent, e))
295 })?;
296 }
297 }
298
299 let mut file = tokio::fs::File::create(&stdout_to)
300 .await
301 .map_err(|e| SpawnError::Process(format!("create {:?}: {}", stdout_to, e)))?;
302
303 let mut child = cmd
304 .spawn()
305 .map_err(|e| SpawnError::Process(e.to_string()))?;
306
307 let mut child_stdout = child
308 .stdout
309 .take()
310 .ok_or_else(|| SpawnError::Process("child stdout missing".to_string()))?;
311 let mut child_stderr = child
312 .stderr
313 .take()
314 .ok_or_else(|| SpawnError::Process("child stderr missing".to_string()))?;
315
316 let stdout_task = tokio::spawn(async move {
319 let res = tokio::io::copy(&mut child_stdout, &mut file).await;
320 if let Err(e) = file.flush().await {
325 tracing::warn!("spawn_to_file: file flush failed: {}", e);
326 }
327 if let Err(e) = file.sync_all().await {
328 tracing::warn!("spawn_to_file: file sync_all failed: {}", e);
329 }
330 res
331 });
332 let stderr_task = tokio::spawn(async move {
333 let mut buf = Vec::new();
334 let res = tokio::io::copy(&mut child_stderr, &mut buf).await;
335 res.map(|_| buf)
336 });
337
338 let status = child
339 .wait()
340 .await
341 .map_err(|e| SpawnError::Process(format!("wait: {}", e)))?;
342
343 stdout_task
347 .await
348 .map_err(|e| SpawnError::Process(format!("stdout task: {}", e)))?
349 .map_err(|e| SpawnError::Process(format!("stdout copy: {}", e)))?;
350 let stderr_bytes = stderr_task
351 .await
352 .map_err(|e| SpawnError::Process(format!("stderr task: {}", e)))?
353 .map_err(|e| SpawnError::Process(format!("stderr drain: {}", e)))?;
354
355 Ok(SpawnResult {
356 stdout: String::new(),
357 stderr: String::from_utf8_lossy(&stderr_bytes).to_string(),
358 exit_code: status.code().unwrap_or(-1),
359 })
360 }
361}
362
363pub struct RemoteProcessSpawner {
365 channel: Arc<AgentChannel>,
366}
367
368impl RemoteProcessSpawner {
369 pub fn new(channel: Arc<AgentChannel>) -> Self {
371 Self { channel }
372 }
373}
374
375#[async_trait::async_trait]
376impl ProcessSpawner for RemoteProcessSpawner {
377 async fn spawn(
378 &self,
379 command: String,
380 args: Vec<String>,
381 cwd: Option<String>,
382 ) -> Result<SpawnResult, SpawnError> {
383 let params = exec_params(&command, &args, cwd.as_deref());
384
385 let (mut data_rx, result_rx) = self.channel.request_streaming("exec", params).await?;
387
388 let mut stdout = Vec::new();
389 let mut stderr = Vec::new();
390
391 while let Some(data) = data_rx.recv().await {
393 if let Some(out) = data.get("out").and_then(|v| v.as_str()) {
394 if let Ok(decoded) = decode_base64(out) {
395 stdout.extend_from_slice(&decoded);
396 }
397 }
398 if let Some(err) = data.get("err").and_then(|v| v.as_str()) {
399 if let Ok(decoded) = decode_base64(err) {
400 stderr.extend_from_slice(&decoded);
401 }
402 }
403 }
404
405 let result = result_rx
407 .await
408 .map_err(|_| SpawnError::Channel(ChannelError::ChannelClosed))?
409 .map_err(SpawnError::Process)?;
410
411 let exit_code = result
412 .get("code")
413 .and_then(|v| v.as_i64())
414 .map(|c| c as i32)
415 .unwrap_or(-1);
416
417 Ok(SpawnResult {
418 stdout: String::from_utf8_lossy(&stdout).to_string(),
419 stderr: String::from_utf8_lossy(&stderr).to_string(),
420 exit_code,
421 })
422 }
423
424 async fn spawn_to_file(
425 &self,
426 _command: String,
427 _args: Vec<String>,
428 _cwd: Option<String>,
429 _stdout_to: std::path::PathBuf,
430 ) -> Result<SpawnResult, SpawnError> {
431 Err(SpawnError::Process(
432 "stdoutTo is not supported for remote processes".to_string(),
433 ))
434 }
435}
436
437pub struct StdioChild {
457 inner: tokio::process::Child,
458 stdin: Option<ChildStdin>,
459 stdout: Option<ChildStdout>,
460 stderr: Option<ChildStderr>,
461 spawned_locally: bool,
462}
463
464impl StdioChild {
465 pub fn from_tokio_child(mut child: tokio::process::Child, spawned_locally: bool) -> Self {
476 let stdin = child.stdin.take();
477 let stdout = child.stdout.take();
478 let stderr = child.stderr.take();
479 Self {
480 inner: child,
481 stdin,
482 stdout,
483 stderr,
484 spawned_locally,
485 }
486 }
487
488 pub fn from_local_tokio_child(
494 child: tokio::process::Child,
495 post_spawn: PostSpawnAction,
496 ) -> Self {
497 let out = Self::from_tokio_child(child, true);
498 if let Some(pid) = out.inner.id() {
499 post_spawn.apply_to_child(pid);
500 }
501 out
502 }
503
504 pub fn take_stdin(&mut self) -> Option<ChildStdin> {
506 self.stdin.take()
507 }
508
509 pub fn take_stdout(&mut self) -> Option<ChildStdout> {
511 self.stdout.take()
512 }
513
514 pub fn take_stderr(&mut self) -> Option<ChildStderr> {
516 self.stderr.take()
517 }
518
519 pub fn id(&self) -> Option<u32> {
523 self.inner.id()
524 }
525
526 pub fn spawned_locally(&self) -> bool {
530 self.spawned_locally
531 }
532
533 pub async fn kill(&mut self) -> std::io::Result<()> {
535 self.inner.kill().await
536 }
537
538 pub async fn wait(&mut self) -> std::io::Result<ExitStatus> {
540 self.inner.wait().await
541 }
542}
543
544#[async_trait::async_trait]
563pub trait LongRunningSpawner: Send + Sync {
564 async fn spawn_stdio(
570 &self,
571 command: &str,
572 args: &[String],
573 env: Vec<(String, String)>,
574 cwd: Option<&Path>,
575 limits: Option<&ProcessLimits>,
576 ) -> Result<StdioChild, SpawnError>;
577
578 async fn command_exists(&self, command: &str) -> bool;
584}
585
586pub struct LocalLongRunningSpawner;
594
595#[async_trait::async_trait]
596impl LongRunningSpawner for LocalLongRunningSpawner {
597 async fn spawn_stdio(
598 &self,
599 command: &str,
600 args: &[String],
601 env: Vec<(String, String)>,
602 cwd: Option<&Path>,
603 limits: Option<&ProcessLimits>,
604 ) -> Result<StdioChild, SpawnError> {
605 let mut cmd = tokio::process::Command::new(command);
606 cmd.args(args)
607 .envs(env)
608 .stdin(std::process::Stdio::piped())
609 .stdout(std::process::Stdio::piped())
610 .stderr(std::process::Stdio::piped())
611 .hide_window()
612 .kill_on_drop(true);
613 if let Some(dir) = cwd {
614 cmd.current_dir(dir);
615 }
616
617 let post_spawn = match limits {
622 Some(lim) => lim
623 .apply_to_command(&mut cmd)
624 .map_err(|e| SpawnError::Process(format!("Failed to apply process limits: {e}")))?,
625 None => PostSpawnAction::default(),
626 };
627
628 let child = cmd
629 .spawn()
630 .map_err(|e| SpawnError::Process(e.to_string()))?;
631 Ok(StdioChild::from_local_tokio_child(child, post_spawn))
632 }
633
634 async fn command_exists(&self, command: &str) -> bool {
635 which::which(command).is_ok()
636 }
637}
638
639#[cfg(test)]
640mod tests {
641 use super::*;
642 use tokio::io::AsyncReadExt;
643
644 #[tokio::test]
645 async fn test_local_spawner() {
646 let spawner = LocalProcessSpawner;
647 let result = spawner
648 .spawn("echo".to_string(), vec!["hello".to_string()], None)
649 .await
650 .unwrap();
651
652 assert_eq!(result.exit_code, 0);
653 assert!(result.stdout.trim() == "hello");
654 }
655
656 #[tokio::test]
657 async fn test_local_spawner_stdout_to_file() {
658 let spawner = LocalProcessSpawner;
659 let tmp =
660 std::env::temp_dir().join(format!("fresh-spawner-test-{}.out", std::process::id()));
661 #[allow(clippy::let_underscore_must_use)]
665 let _ = std::fs::remove_file(&tmp);
666 let result = spawner
667 .spawn_to_file(
668 "echo".to_string(),
669 vec!["hello-from-disk".to_string()],
670 None,
671 tmp.clone(),
672 )
673 .await
674 .unwrap();
675
676 assert_eq!(result.exit_code, 0);
677 assert!(
678 result.stdout.is_empty(),
679 "stdout should be empty when streaming"
680 );
681 let contents = std::fs::read_to_string(&tmp).expect("output file should exist");
682 assert_eq!(contents.trim(), "hello-from-disk");
683 #[allow(clippy::let_underscore_must_use)]
687 let _ = std::fs::remove_file(&tmp);
688 }
689
690 #[tokio::test]
691 async fn test_local_spawner_cancellable_kill() {
692 let spawner = LocalProcessSpawner;
693 let (kill_tx, kill_rx) = tokio::sync::oneshot::channel::<()>();
694
695 let task = tokio::spawn(async move {
697 spawner
698 .spawn_cancellable(
699 "sleep".to_string(),
700 vec!["30".to_string()],
701 None,
702 None,
703 kill_rx,
704 )
705 .await
706 });
707
708 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
709 #[allow(clippy::let_underscore_must_use)]
715 let _ = kill_tx.send(());
716
717 let start = std::time::Instant::now();
718 let result = task.await.unwrap().unwrap();
719 let elapsed = start.elapsed();
720
721 assert!(
724 elapsed < std::time::Duration::from_secs(5),
725 "kill should be prompt, took {:?}",
726 elapsed
727 );
728 assert_ne!(result.exit_code, 0, "killed process shouldn't be exit 0");
729 }
730
731 #[tokio::test]
732 async fn local_long_running_spawn_stdio_pipes_output() {
733 let spawner = LocalLongRunningSpawner;
734 let mut child = spawner
735 .spawn_stdio(
736 "sh",
737 &["-c".into(), "echo hi".into()],
738 Vec::new(),
739 None,
740 None,
741 )
742 .await
743 .expect("spawn succeeds");
744
745 let mut stdout = child.take_stdout().expect("stdout piped");
746 let mut buf = String::new();
747 stdout.read_to_string(&mut buf).await.unwrap();
748 assert_eq!(buf.trim(), "hi");
749
750 let status = child.wait().await.unwrap();
751 assert!(status.success());
752 assert!(child.spawned_locally());
753 }
754
755 #[tokio::test]
756 async fn local_long_running_command_exists_for_sh() {
757 let spawner = LocalLongRunningSpawner;
758 assert!(spawner.command_exists("sh").await);
759 assert!(
760 !spawner
761 .command_exists("fresh-unlikely-binary-name-ygzu9")
762 .await
763 );
764 }
765}