1use crate::services::env_provider::EnvProvider;
19use crate::services::process_hidden::HideWindow;
20use crate::services::process_limits::PostSpawnAction;
21use crate::services::remote::channel::{AgentChannel, ChannelError};
22use crate::services::remote::protocol::{decode_base64, exec_params};
23use crate::services::workspace_trust::{gate, WorkspaceTrust};
24use crate::types::ProcessLimits;
25
26async fn local_captured_env(provider: &EnvProvider) -> Vec<(String, String)> {
31 provider
32 .current(|script| async move {
33 let shell = std::env::var("SHELL").unwrap_or_else(|_| "/bin/sh".to_string());
34 let output = tokio::process::Command::new(&shell)
35 .arg("-lc")
36 .arg(&script)
37 .hide_window()
38 .output()
39 .await
40 .ok()?;
41 Some(String::from_utf8_lossy(&output.stdout).into_owned())
42 })
43 .await
44}
45use std::path::Path;
46use std::process::ExitStatus;
47use std::sync::Arc;
48use tokio::process::{ChildStderr, ChildStdin, ChildStdout};
49
50#[derive(Debug, Clone)]
52pub struct SpawnResult {
53 pub stdout: String,
54 pub stderr: String,
55 pub exit_code: i32,
56}
57
58#[derive(Debug, thiserror::Error)]
60pub enum SpawnError {
61 #[error("Channel error: {0}")]
62 Channel(#[from] ChannelError),
63
64 #[error("Process error: {0}")]
65 Process(String),
66
67 #[error("Decode error: {0}")]
68 Decode(String),
69}
70
71#[async_trait::async_trait]
76pub trait ProcessSpawner: Send + Sync {
77 async fn spawn(
79 &self,
80 command: String,
81 args: Vec<String>,
82 cwd: Option<String>,
83 ) -> Result<SpawnResult, SpawnError>;
84
85 async fn spawn_to_file(
92 &self,
93 command: String,
94 args: Vec<String>,
95 cwd: Option<String>,
96 stdout_to: std::path::PathBuf,
97 ) -> Result<SpawnResult, SpawnError> {
98 let result = self.spawn(command, args, cwd).await?;
101 if result.exit_code == 0 || !result.stdout.is_empty() {
102 std::fs::write(&stdout_to, result.stdout.as_bytes())
103 .map_err(|e| SpawnError::Process(format!("write {:?}: {}", stdout_to, e)))?;
104 }
105 Ok(SpawnResult {
106 stdout: String::new(),
107 stderr: result.stderr,
108 exit_code: result.exit_code,
109 })
110 }
111
112 async fn spawn_cancellable(
122 &self,
123 command: String,
124 args: Vec<String>,
125 cwd: Option<String>,
126 stdout_to: Option<std::path::PathBuf>,
127 _kill_rx: tokio::sync::oneshot::Receiver<()>,
128 ) -> Result<SpawnResult, SpawnError> {
129 match stdout_to {
130 Some(p) => self.spawn_to_file(command, args, cwd, p).await,
131 None => self.spawn(command, args, cwd).await,
132 }
133 }
134}
135
136pub struct LocalProcessSpawner {
143 env: Arc<EnvProvider>,
144 trust: Arc<WorkspaceTrust>,
145}
146
147impl LocalProcessSpawner {
148 pub fn new(env: Arc<EnvProvider>, trust: Arc<WorkspaceTrust>) -> Self {
151 Self { env, trust }
152 }
153
154 async fn apply_env(&self, cmd: &mut tokio::process::Command) {
155 let env = local_captured_env(&self.env).await;
156 if !env.is_empty() {
157 cmd.envs(env.iter().map(|(k, v)| (k.as_str(), v.as_str())));
158 }
159 }
160}
161
162#[async_trait::async_trait]
163impl ProcessSpawner for LocalProcessSpawner {
164 async fn spawn(
165 &self,
166 command: String,
167 args: Vec<String>,
168 cwd: Option<String>,
169 ) -> Result<SpawnResult, SpawnError> {
170 gate(&self.trust, &command, cwd.as_deref())?;
171 let mut cmd = tokio::process::Command::new(&command);
172 cmd.args(&args);
173 self.apply_env(&mut cmd).await;
174 cmd.hide_window();
175
176 if let Some(ref dir) = cwd {
177 cmd.current_dir(dir);
178 }
179
180 let output = cmd
181 .output()
182 .await
183 .map_err(|e| SpawnError::Process(e.to_string()))?;
184
185 Ok(SpawnResult {
186 stdout: String::from_utf8_lossy(&output.stdout).to_string(),
187 stderr: String::from_utf8_lossy(&output.stderr).to_string(),
188 exit_code: output.status.code().unwrap_or(-1),
189 })
190 }
191
192 async fn spawn_cancellable(
196 &self,
197 command: String,
198 args: Vec<String>,
199 cwd: Option<String>,
200 stdout_to: Option<std::path::PathBuf>,
201 kill_rx: tokio::sync::oneshot::Receiver<()>,
202 ) -> Result<SpawnResult, SpawnError> {
203 use std::process::Stdio;
204 use tokio::io::AsyncReadExt;
205
206 gate(&self.trust, &command, cwd.as_deref())?;
207 let mut cmd = tokio::process::Command::new(&command);
208 cmd.args(&args);
209 self.apply_env(&mut cmd).await;
210 cmd.hide_window();
211 cmd.stdout(Stdio::piped());
212 cmd.stderr(Stdio::piped());
213 if let Some(ref dir) = cwd {
214 cmd.current_dir(dir);
215 }
216
217 if let Some(ref path) = stdout_to {
222 if let Some(parent) = path.parent() {
223 if !parent.as_os_str().is_empty() {
224 tokio::fs::create_dir_all(parent).await.map_err(|e| {
225 SpawnError::Process(format!("create_dir_all {:?}: {}", parent, e))
226 })?;
227 }
228 }
229 }
230
231 let mut child = cmd
232 .spawn()
233 .map_err(|e| SpawnError::Process(e.to_string()))?;
234
235 let mut child_stdout = child
236 .stdout
237 .take()
238 .ok_or_else(|| SpawnError::Process("child stdout missing".to_string()))?;
239 let mut child_stderr = child
240 .stderr
241 .take()
242 .ok_or_else(|| SpawnError::Process("child stderr missing".to_string()))?;
243
244 let stdout_task: tokio::task::JoinHandle<std::io::Result<Vec<u8>>> = match stdout_to {
247 Some(path) => tokio::spawn(async move {
248 let mut file = tokio::fs::File::create(&path).await?;
249 tokio::io::copy(&mut child_stdout, &mut file).await?;
250 use tokio::io::AsyncWriteExt;
251 if let Err(e) = file.flush().await {
256 tracing::warn!("spawn_cancellable: file flush failed: {}", e);
257 }
258 if let Err(e) = file.sync_all().await {
259 tracing::warn!("spawn_cancellable: file sync_all failed: {}", e);
260 }
261 Ok(Vec::new())
262 }),
263 None => tokio::spawn(async move {
264 let mut buf = Vec::new();
265 child_stdout.read_to_end(&mut buf).await?;
266 Ok(buf)
267 }),
268 };
269 let stderr_task: tokio::task::JoinHandle<std::io::Result<Vec<u8>>> =
270 tokio::spawn(async move {
271 let mut buf = Vec::new();
272 child_stderr.read_to_end(&mut buf).await?;
273 Ok(buf)
274 });
275
276 let exit_code = tokio::select! {
280 status = child.wait() => status
281 .map(|s| s.code().unwrap_or(-1))
282 .unwrap_or(-1),
283 _ = kill_rx => {
284 if let Err(e) = child.start_kill() {
289 tracing::debug!("spawn_cancellable: start_kill (already exited?): {}", e);
290 }
291 child.wait().await.map(|s| s.code().unwrap_or(-1)).unwrap_or(-1)
292 }
293 };
294
295 let stdout_bytes = stdout_task
298 .await
299 .map_err(|e| SpawnError::Process(format!("stdout task: {}", e)))?
300 .map_err(|e| SpawnError::Process(format!("stdout drain: {}", e)))?;
301 let stderr_bytes = stderr_task
302 .await
303 .map_err(|e| SpawnError::Process(format!("stderr task: {}", e)))?
304 .map_err(|e| SpawnError::Process(format!("stderr drain: {}", e)))?;
305
306 Ok(SpawnResult {
307 stdout: String::from_utf8_lossy(&stdout_bytes).to_string(),
308 stderr: String::from_utf8_lossy(&stderr_bytes).to_string(),
309 exit_code,
310 })
311 }
312
313 async fn spawn_to_file(
317 &self,
318 command: String,
319 args: Vec<String>,
320 cwd: Option<String>,
321 stdout_to: std::path::PathBuf,
322 ) -> Result<SpawnResult, SpawnError> {
323 use std::process::Stdio;
324 use tokio::io::AsyncWriteExt;
325
326 gate(&self.trust, &command, cwd.as_deref())?;
327 let mut cmd = tokio::process::Command::new(&command);
328 cmd.args(&args);
329 self.apply_env(&mut cmd).await;
330 cmd.hide_window();
331 cmd.stdout(Stdio::piped());
332 cmd.stderr(Stdio::piped());
333 if let Some(ref dir) = cwd {
334 cmd.current_dir(dir);
335 }
336
337 if let Some(parent) = stdout_to.parent() {
341 if !parent.as_os_str().is_empty() {
342 tokio::fs::create_dir_all(parent).await.map_err(|e| {
343 SpawnError::Process(format!("create_dir_all {:?}: {}", parent, e))
344 })?;
345 }
346 }
347
348 let mut file = tokio::fs::File::create(&stdout_to)
349 .await
350 .map_err(|e| SpawnError::Process(format!("create {:?}: {}", stdout_to, e)))?;
351
352 let mut child = cmd
353 .spawn()
354 .map_err(|e| SpawnError::Process(e.to_string()))?;
355
356 let mut child_stdout = child
357 .stdout
358 .take()
359 .ok_or_else(|| SpawnError::Process("child stdout missing".to_string()))?;
360 let mut child_stderr = child
361 .stderr
362 .take()
363 .ok_or_else(|| SpawnError::Process("child stderr missing".to_string()))?;
364
365 let stdout_task = tokio::spawn(async move {
368 let res = tokio::io::copy(&mut child_stdout, &mut file).await;
369 if let Err(e) = file.flush().await {
374 tracing::warn!("spawn_to_file: file flush failed: {}", e);
375 }
376 if let Err(e) = file.sync_all().await {
377 tracing::warn!("spawn_to_file: file sync_all failed: {}", e);
378 }
379 res
380 });
381 let stderr_task = tokio::spawn(async move {
382 let mut buf = Vec::new();
383 let res = tokio::io::copy(&mut child_stderr, &mut buf).await;
384 res.map(|_| buf)
385 });
386
387 let status = child
388 .wait()
389 .await
390 .map_err(|e| SpawnError::Process(format!("wait: {}", e)))?;
391
392 stdout_task
396 .await
397 .map_err(|e| SpawnError::Process(format!("stdout task: {}", e)))?
398 .map_err(|e| SpawnError::Process(format!("stdout copy: {}", e)))?;
399 let stderr_bytes = stderr_task
400 .await
401 .map_err(|e| SpawnError::Process(format!("stderr task: {}", e)))?
402 .map_err(|e| SpawnError::Process(format!("stderr drain: {}", e)))?;
403
404 Ok(SpawnResult {
405 stdout: String::new(),
406 stderr: String::from_utf8_lossy(&stderr_bytes).to_string(),
407 exit_code: status.code().unwrap_or(-1),
408 })
409 }
410}
411
412fn env_wrap(env: &[(String, String)], command: &str, args: &[String]) -> (String, Vec<String>) {
418 if env.is_empty() {
419 return (command.to_string(), args.to_vec());
420 }
421 let mut wrapped = Vec::with_capacity(env.len() + 1 + args.len());
422 for (k, v) in env {
423 wrapped.push(format!("{k}={v}"));
424 }
425 wrapped.push(command.to_string());
426 wrapped.extend(args.iter().cloned());
427 ("env".to_string(), wrapped)
428}
429
430pub struct RemoteProcessSpawner {
432 channel: Arc<AgentChannel>,
433 env: Arc<EnvProvider>,
434 trust: Arc<WorkspaceTrust>,
435}
436
437impl RemoteProcessSpawner {
438 pub fn new(
441 channel: Arc<AgentChannel>,
442 env: Arc<EnvProvider>,
443 trust: Arc<WorkspaceTrust>,
444 ) -> Self {
445 Self {
446 channel,
447 env,
448 trust,
449 }
450 }
451
452 async fn captured_env(&self) -> Vec<(String, String)> {
455 let channel = self.channel.clone();
456 self.env
457 .current(move |script| async move {
458 let params = exec_params("sh", &["-lc".to_string(), script], None);
459 let (mut data_rx, _result) =
460 channel.request_streaming("exec", params).await.ok()?;
461 let mut stdout = Vec::new();
462 while let Some(d) = data_rx.recv().await {
463 if let Some(out) = d.get("out").and_then(|v| v.as_str()) {
464 if let Ok(b) = decode_base64(out) {
465 stdout.extend_from_slice(&b);
466 }
467 }
468 }
469 Some(String::from_utf8_lossy(&stdout).into_owned())
470 })
471 .await
472 }
473}
474
475#[async_trait::async_trait]
476impl ProcessSpawner for RemoteProcessSpawner {
477 async fn spawn(
478 &self,
479 command: String,
480 args: Vec<String>,
481 cwd: Option<String>,
482 ) -> Result<SpawnResult, SpawnError> {
483 gate(&self.trust, &command, cwd.as_deref())?;
484 let captured = self.captured_env().await;
485 let (eff_cmd, eff_args) = env_wrap(&captured, &command, &args);
486 let params = exec_params(&eff_cmd, &eff_args, cwd.as_deref());
487
488 let (mut data_rx, result_rx) = self.channel.request_streaming("exec", params).await?;
490
491 let mut stdout = Vec::new();
492 let mut stderr = Vec::new();
493
494 while let Some(data) = data_rx.recv().await {
496 if let Some(out) = data.get("out").and_then(|v| v.as_str()) {
497 if let Ok(decoded) = decode_base64(out) {
498 stdout.extend_from_slice(&decoded);
499 }
500 }
501 if let Some(err) = data.get("err").and_then(|v| v.as_str()) {
502 if let Ok(decoded) = decode_base64(err) {
503 stderr.extend_from_slice(&decoded);
504 }
505 }
506 }
507
508 let result = result_rx
510 .await
511 .map_err(|_| SpawnError::Channel(ChannelError::ChannelClosed))?
512 .map_err(SpawnError::Process)?;
513
514 let exit_code = result
515 .get("code")
516 .and_then(|v| v.as_i64())
517 .map(|c| c as i32)
518 .unwrap_or(-1);
519
520 Ok(SpawnResult {
521 stdout: String::from_utf8_lossy(&stdout).to_string(),
522 stderr: String::from_utf8_lossy(&stderr).to_string(),
523 exit_code,
524 })
525 }
526
527 async fn spawn_to_file(
528 &self,
529 _command: String,
530 _args: Vec<String>,
531 _cwd: Option<String>,
532 _stdout_to: std::path::PathBuf,
533 ) -> Result<SpawnResult, SpawnError> {
534 Err(SpawnError::Process(
535 "stdoutTo is not supported for remote processes".to_string(),
536 ))
537 }
538}
539
540pub struct StdioChild {
560 inner: tokio::process::Child,
561 stdin: Option<ChildStdin>,
562 stdout: Option<ChildStdout>,
563 stderr: Option<ChildStderr>,
564 spawned_locally: bool,
565}
566
567impl StdioChild {
568 pub fn from_tokio_child(mut child: tokio::process::Child, spawned_locally: bool) -> Self {
579 let stdin = child.stdin.take();
580 let stdout = child.stdout.take();
581 let stderr = child.stderr.take();
582 Self {
583 inner: child,
584 stdin,
585 stdout,
586 stderr,
587 spawned_locally,
588 }
589 }
590
591 pub fn from_local_tokio_child(
597 child: tokio::process::Child,
598 post_spawn: PostSpawnAction,
599 ) -> Self {
600 let out = Self::from_tokio_child(child, true);
601 if let Some(pid) = out.inner.id() {
602 post_spawn.apply_to_child(pid);
603 }
604 out
605 }
606
607 pub fn take_stdin(&mut self) -> Option<ChildStdin> {
609 self.stdin.take()
610 }
611
612 pub fn take_stdout(&mut self) -> Option<ChildStdout> {
614 self.stdout.take()
615 }
616
617 pub fn take_stderr(&mut self) -> Option<ChildStderr> {
619 self.stderr.take()
620 }
621
622 pub fn id(&self) -> Option<u32> {
626 self.inner.id()
627 }
628
629 pub fn spawned_locally(&self) -> bool {
633 self.spawned_locally
634 }
635
636 pub async fn kill(&mut self) -> std::io::Result<()> {
638 self.inner.kill().await
639 }
640
641 pub async fn wait(&mut self) -> std::io::Result<ExitStatus> {
643 self.inner.wait().await
644 }
645}
646
647#[async_trait::async_trait]
666pub trait LongRunningSpawner: Send + Sync {
667 async fn spawn_stdio(
673 &self,
674 command: &str,
675 args: &[String],
676 env: Vec<(String, String)>,
677 cwd: Option<&Path>,
678 limits: Option<&ProcessLimits>,
679 ) -> Result<StdioChild, SpawnError>;
680
681 async fn command_exists(&self, command: &str) -> bool;
687}
688
689pub struct LocalLongRunningSpawner {
697 env: Arc<EnvProvider>,
698 trust: Arc<WorkspaceTrust>,
699}
700
701impl LocalLongRunningSpawner {
702 pub fn new(env: Arc<EnvProvider>, trust: Arc<WorkspaceTrust>) -> Self {
705 Self { env, trust }
706 }
707}
708
709#[async_trait::async_trait]
710impl LongRunningSpawner for LocalLongRunningSpawner {
711 async fn spawn_stdio(
712 &self,
713 command: &str,
714 args: &[String],
715 env: Vec<(String, String)>,
716 cwd: Option<&Path>,
717 limits: Option<&ProcessLimits>,
718 ) -> Result<StdioChild, SpawnError> {
719 gate(
720 &self.trust,
721 command,
722 cwd.map(|p| p.to_string_lossy()).as_deref(),
723 )?;
724 let captured = local_captured_env(&self.env).await;
725 let mut cmd = tokio::process::Command::new(command);
726 cmd.args(args)
727 .envs(captured.iter().map(|(k, v)| (k.as_str(), v.as_str())))
729 .envs(env)
730 .stdin(std::process::Stdio::piped())
731 .stdout(std::process::Stdio::piped())
732 .stderr(std::process::Stdio::piped())
733 .hide_window()
734 .kill_on_drop(true);
735 if let Some(dir) = cwd {
736 cmd.current_dir(dir);
737 }
738
739 let post_spawn = match limits {
744 Some(lim) => lim
745 .apply_to_command(&mut cmd)
746 .map_err(|e| SpawnError::Process(format!("Failed to apply process limits: {e}")))?,
747 None => PostSpawnAction::default(),
748 };
749
750 let child = cmd
751 .spawn()
752 .map_err(|e| SpawnError::Process(e.to_string()))?;
753 Ok(StdioChild::from_local_tokio_child(child, post_spawn))
754 }
755
756 async fn command_exists(&self, command: &str) -> bool {
757 let captured = local_captured_env(&self.env).await;
762 if let Some((_, path)) = captured.iter().find(|(k, _)| k == "PATH") {
763 let cwd = std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."));
764 return which::which_in(command, Some(path), &cwd).is_ok();
765 }
766 which::which(command).is_ok()
767 }
768}
769
770fn shell_quote(s: &str) -> String {
773 let mut out = String::with_capacity(s.len() + 2);
774 out.push('\'');
775 for c in s.chars() {
776 if c == '\'' {
777 out.push_str("'\\''");
778 } else {
779 out.push(c);
780 }
781 }
782 out.push('\'');
783 out
784}
785
786fn build_remote_exec(
792 env: &[(String, String)],
793 cwd: Option<&str>,
794 command: &str,
795 args: &[String],
796) -> String {
797 let mut s = String::new();
798 if let Some(dir) = cwd {
799 s.push_str("cd ");
800 s.push_str(&shell_quote(dir));
801 s.push_str(" && ");
802 }
803 s.push_str("exec ");
804 if !env.is_empty() {
805 s.push_str("env ");
806 for (k, v) in env {
807 s.push_str(k);
808 s.push('=');
809 s.push_str(&shell_quote(v));
810 s.push(' ');
811 }
812 }
813 s.push_str(&shell_quote(command));
814 for a in args {
815 s.push(' ');
816 s.push_str(&shell_quote(a));
817 }
818 s
819}
820
821fn build_remote_command_exists(env: &[(String, String)], command: &str) -> String {
825 let mut s = String::new();
826 for (k, v) in env {
827 s.push_str("export ");
828 s.push_str(k);
829 s.push('=');
830 s.push_str(&shell_quote(v));
831 s.push_str("; ");
832 }
833 s.push_str("command -v ");
834 s.push_str(&shell_quote(command));
835 s.push_str(" >/dev/null 2>&1");
836 s
837}
838
839fn build_ssh_args(
844 params: &crate::services::remote::ConnectionParams,
845 remote_cmd: &str,
846) -> Vec<String> {
847 let mut a = vec![
848 "-o".to_string(),
849 "StrictHostKeyChecking=accept-new".to_string(),
850 "-o".to_string(),
851 "BatchMode=yes".to_string(),
852 ];
853 if let Some(port) = params.port {
854 a.push("-p".to_string());
855 a.push(port.to_string());
856 }
857 if let Some(ref identity) = params.identity_file {
858 a.push("-i".to_string());
859 a.push(identity.to_string_lossy().into_owned());
860 }
861 a.extend(params.extra_args.iter().cloned());
862 a.push(params.ssh_target());
863 a.push(remote_cmd.to_string());
864 a
865}
866
867pub fn build_ssh_terminal_args(
882 params: &crate::services::remote::ConnectionParams,
883 remote_dir: Option<&str>,
884) -> Vec<String> {
885 let mut a = vec![
886 "-t".to_string(),
887 "-o".to_string(),
888 "StrictHostKeyChecking=accept-new".to_string(),
889 ];
890 if let Some(port) = params.port {
891 a.push("-p".to_string());
892 a.push(port.to_string());
893 }
894 if let Some(ref identity) = params.identity_file {
895 a.push("-i".to_string());
896 a.push(identity.to_string_lossy().into_owned());
897 }
898 a.extend(params.extra_args.iter().cloned());
899 a.push(params.ssh_target());
900
901 let mut remote_cmd = String::new();
908 if let Some(dir) = remote_dir.filter(|d| !d.is_empty()) {
909 let quoted = shell_quote(dir);
910 remote_cmd.push_str(&format!(
911 "d={quoted}; [ -d \"$d\" ] || d=$(dirname \"$d\"); cd \"$d\" 2>/dev/null; "
912 ));
913 }
914 remote_cmd.push_str("exec ${SHELL:-/bin/sh} -l");
915 a.push(remote_cmd);
916 a
917}
918
919pub fn build_kube_terminal_args(target: &crate::services::remote::KubeTarget) -> Vec<String> {
933 let mut remote_cmd = String::new();
934 if let Some(dir) = target.workspace.as_deref().filter(|d| !d.is_empty()) {
935 let quoted = shell_quote(dir);
936 remote_cmd.push_str(&format!(
937 "d={quoted}; [ -d \"$d\" ] || d=$(dirname \"$d\"); cd \"$d\" 2>/dev/null; "
938 ));
939 }
940 remote_cmd.push_str("exec ${SHELL:-/bin/sh} -l");
941 crate::services::remote::transport::kubectl_exec_argv(
942 target,
943 &["-it"],
944 "sh",
945 &["-lc".to_string(), remote_cmd],
946 )
947}
948
949pub struct RemoteLongRunningSpawner {
963 params: crate::services::remote::ConnectionParams,
964 env: Arc<EnvProvider>,
965 trust: Arc<WorkspaceTrust>,
966}
967
968impl RemoteLongRunningSpawner {
969 pub fn new(
972 params: crate::services::remote::ConnectionParams,
973 env: Arc<EnvProvider>,
974 trust: Arc<WorkspaceTrust>,
975 ) -> Self {
976 Self { params, env, trust }
977 }
978
979 async fn captured_env(&self) -> Vec<(String, String)> {
982 let params = self.params.clone();
983 self.env
984 .current(move |script| async move {
985 let ssh_args = build_ssh_args(¶ms, &script);
986 let output = tokio::process::Command::new("ssh")
987 .args(&ssh_args)
988 .hide_window()
989 .output()
990 .await
991 .ok()?;
992 Some(String::from_utf8_lossy(&output.stdout).into_owned())
993 })
994 .await
995 }
996}
997
998#[async_trait::async_trait]
999impl LongRunningSpawner for RemoteLongRunningSpawner {
1000 async fn spawn_stdio(
1001 &self,
1002 command: &str,
1003 args: &[String],
1004 env: Vec<(String, String)>,
1005 cwd: Option<&Path>,
1006 _limits: Option<&ProcessLimits>,
1007 ) -> Result<StdioChild, SpawnError> {
1008 let cwd_str = cwd.map(|p| p.to_string_lossy().into_owned());
1011 gate(&self.trust, command, cwd_str.as_deref())?;
1012
1013 let mut merged = self.captured_env().await;
1016 merged.extend(env);
1017
1018 let remote = build_remote_exec(&merged, cwd_str.as_deref(), command, args);
1019 let ssh_args = build_ssh_args(&self.params, &remote);
1020
1021 let mut cmd = tokio::process::Command::new("ssh");
1022 cmd.args(&ssh_args)
1023 .stdin(std::process::Stdio::piped())
1024 .stdout(std::process::Stdio::piped())
1025 .stderr(std::process::Stdio::piped())
1026 .hide_window()
1027 .kill_on_drop(true);
1028
1029 let child = cmd
1030 .spawn()
1031 .map_err(|e| SpawnError::Process(e.to_string()))?;
1032 Ok(StdioChild::from_tokio_child(child, false))
1035 }
1036
1037 async fn command_exists(&self, command: &str) -> bool {
1038 let captured = self.captured_env().await;
1039 let remote = build_remote_command_exists(&captured, command);
1040 let ssh_args = build_ssh_args(&self.params, &remote);
1041 match tokio::process::Command::new("ssh")
1042 .args(&ssh_args)
1043 .hide_window()
1044 .output()
1045 .await
1046 {
1047 Ok(output) => output.status.success(),
1048 Err(_) => false,
1049 }
1050 }
1051}
1052
1053#[cfg(test)]
1054mod tests {
1055 use super::*;
1056 use tokio::io::AsyncReadExt;
1057
1058 #[tokio::test]
1059 async fn test_local_spawner() {
1060 let spawner = LocalProcessSpawner::new(
1061 Arc::new(EnvProvider::inactive()),
1062 Arc::new(WorkspaceTrust::permissive()),
1063 );
1064 let result = spawner
1065 .spawn("echo".to_string(), vec!["hello".to_string()], None)
1066 .await
1067 .unwrap();
1068
1069 assert_eq!(result.exit_code, 0);
1070 assert!(result.stdout.trim() == "hello");
1071 }
1072
1073 #[tokio::test]
1074 async fn test_local_spawner_stdout_to_file() {
1075 let spawner = LocalProcessSpawner::new(
1076 Arc::new(EnvProvider::inactive()),
1077 Arc::new(WorkspaceTrust::permissive()),
1078 );
1079 let tmp =
1080 std::env::temp_dir().join(format!("fresh-spawner-test-{}.out", std::process::id()));
1081 #[allow(clippy::let_underscore_must_use)]
1085 let _ = std::fs::remove_file(&tmp);
1086 let result = spawner
1087 .spawn_to_file(
1088 "echo".to_string(),
1089 vec!["hello-from-disk".to_string()],
1090 None,
1091 tmp.clone(),
1092 )
1093 .await
1094 .unwrap();
1095
1096 assert_eq!(result.exit_code, 0);
1097 assert!(
1098 result.stdout.is_empty(),
1099 "stdout should be empty when streaming"
1100 );
1101 let contents = std::fs::read_to_string(&tmp).expect("output file should exist");
1102 assert_eq!(contents.trim(), "hello-from-disk");
1103 #[allow(clippy::let_underscore_must_use)]
1107 let _ = std::fs::remove_file(&tmp);
1108 }
1109
1110 #[tokio::test]
1111 async fn test_local_spawner_cancellable_kill() {
1112 let spawner = LocalProcessSpawner::new(
1113 Arc::new(EnvProvider::inactive()),
1114 Arc::new(WorkspaceTrust::permissive()),
1115 );
1116 let (kill_tx, kill_rx) = tokio::sync::oneshot::channel::<()>();
1117
1118 let task = tokio::spawn(async move {
1120 spawner
1121 .spawn_cancellable(
1122 "sleep".to_string(),
1123 vec!["30".to_string()],
1124 None,
1125 None,
1126 kill_rx,
1127 )
1128 .await
1129 });
1130
1131 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1132 #[allow(clippy::let_underscore_must_use)]
1138 let _ = kill_tx.send(());
1139
1140 let start = std::time::Instant::now();
1141 let result = task.await.unwrap().unwrap();
1142 let elapsed = start.elapsed();
1143
1144 assert!(
1147 elapsed < std::time::Duration::from_secs(5),
1148 "kill should be prompt, took {:?}",
1149 elapsed
1150 );
1151 assert_ne!(result.exit_code, 0, "killed process shouldn't be exit 0");
1152 }
1153
1154 #[tokio::test]
1155 async fn local_long_running_spawn_stdio_pipes_output() {
1156 let spawner = LocalLongRunningSpawner::new(
1157 Arc::new(EnvProvider::inactive()),
1158 Arc::new(WorkspaceTrust::permissive()),
1159 );
1160 let mut child = spawner
1161 .spawn_stdio(
1162 "sh",
1163 &["-c".into(), "echo hi".into()],
1164 Vec::new(),
1165 None,
1166 None,
1167 )
1168 .await
1169 .expect("spawn succeeds");
1170
1171 let mut stdout = child.take_stdout().expect("stdout piped");
1172 let mut buf = String::new();
1173 stdout.read_to_string(&mut buf).await.unwrap();
1174 assert_eq!(buf.trim(), "hi");
1175
1176 let status = child.wait().await.unwrap();
1177 assert!(status.success());
1178 assert!(child.spawned_locally());
1179 }
1180
1181 #[tokio::test]
1182 async fn local_long_running_command_exists_for_sh() {
1183 let spawner = LocalLongRunningSpawner::new(
1184 Arc::new(EnvProvider::inactive()),
1185 Arc::new(WorkspaceTrust::permissive()),
1186 );
1187 assert!(spawner.command_exists("sh").await);
1188 assert!(
1189 !spawner
1190 .command_exists("fresh-unlikely-binary-name-ygzu9")
1191 .await
1192 );
1193 }
1194
1195 #[cfg(unix)]
1199 #[tokio::test]
1200 async fn local_spawner_applies_active_env_provider() {
1201 let env = Arc::new(EnvProvider::inactive());
1204 env.set("export FRESH_ENV_TEST=hi-from-provider".into(), None);
1205 let spawner = LocalProcessSpawner::new(env, Arc::new(WorkspaceTrust::permissive()));
1206 let result = spawner
1207 .spawn(
1208 "sh".into(),
1209 vec!["-c".into(), "printf %s \"$FRESH_ENV_TEST\"".into()],
1210 None,
1211 )
1212 .await
1213 .unwrap();
1214 assert_eq!(result.exit_code, 0);
1215 assert_eq!(result.stdout, "hi-from-provider");
1216 }
1217
1218 #[tokio::test]
1219 async fn local_spawner_inactive_provider_injects_nothing() {
1220 let spawner = LocalProcessSpawner::new(
1221 Arc::new(EnvProvider::inactive()),
1222 Arc::new(WorkspaceTrust::permissive()),
1223 );
1224 let result = spawner
1225 .spawn(
1226 "sh".into(),
1227 vec!["-c".into(), "printf %s \"${FRESH_ENV_TEST:-unset}\"".into()],
1228 None,
1229 )
1230 .await
1231 .unwrap();
1232 assert_eq!(result.stdout, "unset");
1233 }
1234
1235 #[test]
1238 fn shell_quote_wraps_and_escapes() {
1239 assert_eq!(shell_quote("abc"), "'abc'");
1240 assert_eq!(shell_quote("a b/c"), "'a b/c'");
1241 assert_eq!(shell_quote("a'b"), "'a'\\''b'");
1242 }
1243
1244 #[test]
1245 fn build_remote_exec_with_cwd_and_env() {
1246 let env = vec![("VIRTUAL_ENV".to_string(), "/proj/.venv".to_string())];
1247 let s = build_remote_exec(&env, Some("/proj dir"), "python", &["x.py".to_string()]);
1248 assert_eq!(
1249 s,
1250 "cd '/proj dir' && exec env VIRTUAL_ENV='/proj/.venv' 'python' 'x.py'"
1251 );
1252 }
1253
1254 #[test]
1255 fn build_remote_exec_minimal() {
1256 assert_eq!(build_remote_exec(&[], None, "gopls", &[]), "exec 'gopls'");
1257 }
1258
1259 #[test]
1260 fn build_remote_command_exists_exports_env() {
1261 let env = vec![("PATH".to_string(), "/proj/.venv/bin:/usr/bin".to_string())];
1262 assert_eq!(
1263 build_remote_command_exists(&env, "pyright"),
1264 "export PATH='/proj/.venv/bin:/usr/bin'; command -v 'pyright' >/dev/null 2>&1"
1265 );
1266 }
1267
1268 #[test]
1269 fn build_ssh_args_full() {
1270 let params = crate::services::remote::ConnectionParams {
1271 user: Some("u".into()),
1272 host: "h".into(),
1273 port: Some(2222),
1274 identity_file: Some(std::path::PathBuf::from("/k")),
1275 extra_args: Vec::new(),
1276 };
1277 let a = build_ssh_args(¶ms, "echo hi");
1278 let expected: Vec<String> = [
1279 "-o",
1280 "StrictHostKeyChecking=accept-new",
1281 "-o",
1282 "BatchMode=yes",
1283 "-p",
1284 "2222",
1285 "-i",
1286 "/k",
1287 "u@h",
1288 "echo hi",
1289 ]
1290 .into_iter()
1291 .map(String::from)
1292 .collect();
1293 assert_eq!(a, expected);
1294 }
1295
1296 #[test]
1297 fn build_ssh_args_omits_user_and_threads_extra_args() {
1298 let params = crate::services::remote::ConnectionParams {
1300 user: None,
1301 host: "h".into(),
1302 port: None,
1303 identity_file: None,
1304 extra_args: vec!["-J".into(), "jump".into()],
1305 };
1306 let a = build_ssh_args(¶ms, "echo hi");
1307 let expected: Vec<String> = [
1308 "-o",
1309 "StrictHostKeyChecking=accept-new",
1310 "-o",
1311 "BatchMode=yes",
1312 "-J",
1313 "jump",
1314 "h",
1315 "echo hi",
1316 ]
1317 .into_iter()
1318 .map(String::from)
1319 .collect();
1320 assert_eq!(a, expected);
1321 }
1322
1323 #[test]
1324 fn build_ssh_terminal_args_forces_tty_and_login_shell() {
1325 let params = crate::services::remote::ConnectionParams {
1326 user: Some("u".into()),
1327 host: "h".into(),
1328 port: Some(2222),
1329 identity_file: Some(std::path::PathBuf::from("/k")),
1330 extra_args: Vec::new(),
1331 };
1332 let a = build_ssh_terminal_args(¶ms, Some("/proj dir"));
1333 let expected: Vec<String> = [
1334 "-t",
1335 "-o",
1336 "StrictHostKeyChecking=accept-new",
1337 "-p",
1338 "2222",
1339 "-i",
1340 "/k",
1341 "u@h",
1342 "d='/proj dir'; [ -d \"$d\" ] || d=$(dirname \"$d\"); cd \"$d\" 2>/dev/null; exec ${SHELL:-/bin/sh} -l",
1343 ]
1344 .into_iter()
1345 .map(String::from)
1346 .collect();
1347 assert_eq!(a, expected);
1348 assert!(!a.iter().any(|s| s == "BatchMode=yes"));
1350 }
1351
1352 #[test]
1353 fn build_ssh_terminal_args_without_dir_skips_cd() {
1354 let params = crate::services::remote::ConnectionParams {
1355 user: Some("u".into()),
1356 host: "h".into(),
1357 port: None,
1358 identity_file: None,
1359 extra_args: Vec::new(),
1360 };
1361 let a = build_ssh_terminal_args(¶ms, None);
1362 assert_eq!(
1363 a,
1364 vec![
1365 "-t",
1366 "-o",
1367 "StrictHostKeyChecking=accept-new",
1368 "u@h",
1369 "exec ${SHELL:-/bin/sh} -l",
1370 ]
1371 );
1372 assert_eq!(build_ssh_terminal_args(¶ms, Some("")), a);
1374 }
1375
1376 #[test]
1377 fn build_kube_terminal_args_allocates_tty_and_pins_cwd() {
1378 let target = crate::services::remote::KubeTarget {
1379 context: Some("prod".into()),
1380 namespace: "dev".into(),
1381 pod: "pod-1".into(),
1382 container: Some("app".into()),
1383 workspace: Some("/workspace".into()),
1384 };
1385 let a = build_kube_terminal_args(&target);
1386 let expected: Vec<String> = [
1387 "--context",
1388 "prod",
1389 "exec",
1390 "-it",
1391 "-n",
1392 "dev",
1393 "-c",
1394 "app",
1395 "pod-1",
1396 "--",
1397 "sh",
1398 "-lc",
1399 "d='/workspace'; [ -d \"$d\" ] || d=$(dirname \"$d\"); cd \"$d\" 2>/dev/null; exec ${SHELL:-/bin/sh} -l",
1400 ]
1401 .into_iter()
1402 .map(String::from)
1403 .collect();
1404 assert_eq!(a, expected);
1405 }
1406
1407 #[test]
1408 fn build_kube_terminal_args_without_workspace_skips_cd() {
1409 let target = crate::services::remote::KubeTarget {
1410 context: None,
1411 namespace: "dev".into(),
1412 pod: "pod-1".into(),
1413 container: None,
1414 workspace: None,
1415 };
1416 let a = build_kube_terminal_args(&target);
1417 assert_eq!(
1418 a,
1419 vec![
1420 "exec",
1421 "-it",
1422 "-n",
1423 "dev",
1424 "pod-1",
1425 "--",
1426 "sh",
1427 "-lc",
1428 "exec ${SHELL:-/bin/sh} -l",
1429 ]
1430 );
1431 }
1432}