1use crate::services::env_provider::EnvProvider;
19use crate::services::process_hidden::HideWindow;
20use crate::services::process_limits::PostSpawnAction;
21use crate::services::remote::channel::{AgentChannel, ChannelError};
22use crate::services::remote::protocol::{decode_base64, exec_params};
23use crate::services::workspace_trust::{gate, WorkspaceTrust};
24use crate::types::ProcessLimits;
25
26async fn local_captured_env(provider: &EnvProvider) -> Vec<(String, String)> {
31 provider
32 .current(|script| async move {
33 let shell = std::env::var("SHELL").unwrap_or_else(|_| "/bin/sh".to_string());
34 let output = tokio::process::Command::new(&shell)
35 .arg("-lc")
36 .arg(&script)
37 .hide_window()
38 .output()
39 .await
40 .ok()?;
41 Some(String::from_utf8_lossy(&output.stdout).into_owned())
42 })
43 .await
44}
45use std::borrow::Cow;
46use std::path::Path;
47use std::process::ExitStatus;
48use std::sync::Arc;
49use tokio::process::{ChildStderr, ChildStdin, ChildStdout};
50
51#[cfg(windows)]
73fn resolve_program(command: &str) -> Cow<'_, str> {
74 match which::which(command) {
75 Ok(path) => Cow::Owned(path.to_string_lossy().into_owned()),
76 Err(_) => Cow::Borrowed(command),
77 }
78}
79
80#[cfg(not(windows))]
82fn resolve_program(command: &str) -> Cow<'_, str> {
83 Cow::Borrowed(command)
84}
85
86#[derive(Debug, Clone)]
88pub struct SpawnResult {
89 pub stdout: String,
90 pub stderr: String,
91 pub exit_code: i32,
92}
93
94#[derive(Debug, thiserror::Error)]
96pub enum SpawnError {
97 #[error("Channel error: {0}")]
98 Channel(#[from] ChannelError),
99
100 #[error("Process error: {0}")]
101 Process(String),
102
103 #[error("Decode error: {0}")]
104 Decode(String),
105}
106
107#[async_trait::async_trait]
112pub trait ProcessSpawner: Send + Sync {
113 async fn spawn(
115 &self,
116 command: String,
117 args: Vec<String>,
118 cwd: Option<String>,
119 ) -> Result<SpawnResult, SpawnError>;
120
121 async fn spawn_to_file(
128 &self,
129 command: String,
130 args: Vec<String>,
131 cwd: Option<String>,
132 stdout_to: std::path::PathBuf,
133 ) -> Result<SpawnResult, SpawnError> {
134 let result = self.spawn(command, args, cwd).await?;
137 if result.exit_code == 0 || !result.stdout.is_empty() {
138 std::fs::write(&stdout_to, result.stdout.as_bytes())
139 .map_err(|e| SpawnError::Process(format!("write {:?}: {}", stdout_to, e)))?;
140 }
141 Ok(SpawnResult {
142 stdout: String::new(),
143 stderr: result.stderr,
144 exit_code: result.exit_code,
145 })
146 }
147
148 async fn spawn_cancellable(
158 &self,
159 command: String,
160 args: Vec<String>,
161 cwd: Option<String>,
162 stdout_to: Option<std::path::PathBuf>,
163 _kill_rx: tokio::sync::oneshot::Receiver<()>,
164 ) -> Result<SpawnResult, SpawnError> {
165 match stdout_to {
166 Some(p) => self.spawn_to_file(command, args, cwd, p).await,
167 None => self.spawn(command, args, cwd).await,
168 }
169 }
170}
171
172pub struct LocalProcessSpawner {
179 env: Arc<EnvProvider>,
180 trust: Arc<WorkspaceTrust>,
181}
182
183impl LocalProcessSpawner {
184 pub fn new(env: Arc<EnvProvider>, trust: Arc<WorkspaceTrust>) -> Self {
187 Self { env, trust }
188 }
189
190 async fn apply_env(&self, cmd: &mut tokio::process::Command) {
191 let env = local_captured_env(&self.env).await;
192 if !env.is_empty() {
193 cmd.envs(env.iter().map(|(k, v)| (k.as_str(), v.as_str())));
194 }
195 }
196}
197
198#[async_trait::async_trait]
199impl ProcessSpawner for LocalProcessSpawner {
200 async fn spawn(
201 &self,
202 command: String,
203 args: Vec<String>,
204 cwd: Option<String>,
205 ) -> Result<SpawnResult, SpawnError> {
206 gate(&self.trust, &command, cwd.as_deref())?;
207 let mut cmd = tokio::process::Command::new(resolve_program(&command).as_ref());
208 cmd.args(&args);
209 self.apply_env(&mut cmd).await;
210 cmd.hide_window();
211
212 if let Some(ref dir) = cwd {
213 cmd.current_dir(dir);
214 }
215
216 let output = cmd
217 .output()
218 .await
219 .map_err(|e| SpawnError::Process(e.to_string()))?;
220
221 Ok(SpawnResult {
222 stdout: String::from_utf8_lossy(&output.stdout).to_string(),
223 stderr: String::from_utf8_lossy(&output.stderr).to_string(),
224 exit_code: output.status.code().unwrap_or(-1),
225 })
226 }
227
228 async fn spawn_cancellable(
232 &self,
233 command: String,
234 args: Vec<String>,
235 cwd: Option<String>,
236 stdout_to: Option<std::path::PathBuf>,
237 kill_rx: tokio::sync::oneshot::Receiver<()>,
238 ) -> Result<SpawnResult, SpawnError> {
239 use std::process::Stdio;
240 use tokio::io::AsyncReadExt;
241
242 gate(&self.trust, &command, cwd.as_deref())?;
243 let mut cmd = tokio::process::Command::new(resolve_program(&command).as_ref());
244 cmd.args(&args);
245 self.apply_env(&mut cmd).await;
246 cmd.hide_window();
247 cmd.stdout(Stdio::piped());
248 cmd.stderr(Stdio::piped());
249 if let Some(ref dir) = cwd {
250 cmd.current_dir(dir);
251 }
252
253 if let Some(ref path) = stdout_to {
258 if let Some(parent) = path.parent() {
259 if !parent.as_os_str().is_empty() {
260 tokio::fs::create_dir_all(parent).await.map_err(|e| {
261 SpawnError::Process(format!("create_dir_all {:?}: {}", parent, e))
262 })?;
263 }
264 }
265 }
266
267 let mut child = cmd
268 .spawn()
269 .map_err(|e| SpawnError::Process(e.to_string()))?;
270
271 let mut child_stdout = child
272 .stdout
273 .take()
274 .ok_or_else(|| SpawnError::Process("child stdout missing".to_string()))?;
275 let mut child_stderr = child
276 .stderr
277 .take()
278 .ok_or_else(|| SpawnError::Process("child stderr missing".to_string()))?;
279
280 let stdout_task: tokio::task::JoinHandle<std::io::Result<Vec<u8>>> = match stdout_to {
283 Some(path) => tokio::spawn(async move {
284 let mut file = tokio::fs::File::create(&path).await?;
285 tokio::io::copy(&mut child_stdout, &mut file).await?;
286 use tokio::io::AsyncWriteExt;
287 if let Err(e) = file.flush().await {
292 tracing::warn!("spawn_cancellable: file flush failed: {}", e);
293 }
294 if let Err(e) = file.sync_all().await {
295 tracing::warn!("spawn_cancellable: file sync_all failed: {}", e);
296 }
297 Ok(Vec::new())
298 }),
299 None => tokio::spawn(async move {
300 let mut buf = Vec::new();
301 child_stdout.read_to_end(&mut buf).await?;
302 Ok(buf)
303 }),
304 };
305 let stderr_task: tokio::task::JoinHandle<std::io::Result<Vec<u8>>> =
306 tokio::spawn(async move {
307 let mut buf = Vec::new();
308 child_stderr.read_to_end(&mut buf).await?;
309 Ok(buf)
310 });
311
312 let exit_code = tokio::select! {
316 status = child.wait() => status
317 .map(|s| s.code().unwrap_or(-1))
318 .unwrap_or(-1),
319 _ = kill_rx => {
320 if let Err(e) = child.start_kill() {
325 tracing::debug!("spawn_cancellable: start_kill (already exited?): {}", e);
326 }
327 child.wait().await.map(|s| s.code().unwrap_or(-1)).unwrap_or(-1)
328 }
329 };
330
331 let stdout_bytes = stdout_task
334 .await
335 .map_err(|e| SpawnError::Process(format!("stdout task: {}", e)))?
336 .map_err(|e| SpawnError::Process(format!("stdout drain: {}", e)))?;
337 let stderr_bytes = stderr_task
338 .await
339 .map_err(|e| SpawnError::Process(format!("stderr task: {}", e)))?
340 .map_err(|e| SpawnError::Process(format!("stderr drain: {}", e)))?;
341
342 Ok(SpawnResult {
343 stdout: String::from_utf8_lossy(&stdout_bytes).to_string(),
344 stderr: String::from_utf8_lossy(&stderr_bytes).to_string(),
345 exit_code,
346 })
347 }
348
349 async fn spawn_to_file(
353 &self,
354 command: String,
355 args: Vec<String>,
356 cwd: Option<String>,
357 stdout_to: std::path::PathBuf,
358 ) -> Result<SpawnResult, SpawnError> {
359 use std::process::Stdio;
360 use tokio::io::AsyncWriteExt;
361
362 gate(&self.trust, &command, cwd.as_deref())?;
363 let mut cmd = tokio::process::Command::new(resolve_program(&command).as_ref());
364 cmd.args(&args);
365 self.apply_env(&mut cmd).await;
366 cmd.hide_window();
367 cmd.stdout(Stdio::piped());
368 cmd.stderr(Stdio::piped());
369 if let Some(ref dir) = cwd {
370 cmd.current_dir(dir);
371 }
372
373 if let Some(parent) = stdout_to.parent() {
377 if !parent.as_os_str().is_empty() {
378 tokio::fs::create_dir_all(parent).await.map_err(|e| {
379 SpawnError::Process(format!("create_dir_all {:?}: {}", parent, e))
380 })?;
381 }
382 }
383
384 let mut file = tokio::fs::File::create(&stdout_to)
385 .await
386 .map_err(|e| SpawnError::Process(format!("create {:?}: {}", stdout_to, e)))?;
387
388 let mut child = cmd
389 .spawn()
390 .map_err(|e| SpawnError::Process(e.to_string()))?;
391
392 let mut child_stdout = child
393 .stdout
394 .take()
395 .ok_or_else(|| SpawnError::Process("child stdout missing".to_string()))?;
396 let mut child_stderr = child
397 .stderr
398 .take()
399 .ok_or_else(|| SpawnError::Process("child stderr missing".to_string()))?;
400
401 let stdout_task = tokio::spawn(async move {
404 let res = tokio::io::copy(&mut child_stdout, &mut file).await;
405 if let Err(e) = file.flush().await {
410 tracing::warn!("spawn_to_file: file flush failed: {}", e);
411 }
412 if let Err(e) = file.sync_all().await {
413 tracing::warn!("spawn_to_file: file sync_all failed: {}", e);
414 }
415 res
416 });
417 let stderr_task = tokio::spawn(async move {
418 let mut buf = Vec::new();
419 let res = tokio::io::copy(&mut child_stderr, &mut buf).await;
420 res.map(|_| buf)
421 });
422
423 let status = child
424 .wait()
425 .await
426 .map_err(|e| SpawnError::Process(format!("wait: {}", e)))?;
427
428 stdout_task
432 .await
433 .map_err(|e| SpawnError::Process(format!("stdout task: {}", e)))?
434 .map_err(|e| SpawnError::Process(format!("stdout copy: {}", e)))?;
435 let stderr_bytes = stderr_task
436 .await
437 .map_err(|e| SpawnError::Process(format!("stderr task: {}", e)))?
438 .map_err(|e| SpawnError::Process(format!("stderr drain: {}", e)))?;
439
440 Ok(SpawnResult {
441 stdout: String::new(),
442 stderr: String::from_utf8_lossy(&stderr_bytes).to_string(),
443 exit_code: status.code().unwrap_or(-1),
444 })
445 }
446}
447
448fn env_wrap(env: &[(String, String)], command: &str, args: &[String]) -> (String, Vec<String>) {
454 if env.is_empty() {
455 return (command.to_string(), args.to_vec());
456 }
457 let mut wrapped = Vec::with_capacity(env.len() + 1 + args.len());
458 for (k, v) in env {
459 wrapped.push(format!("{k}={v}"));
460 }
461 wrapped.push(command.to_string());
462 wrapped.extend(args.iter().cloned());
463 ("env".to_string(), wrapped)
464}
465
466pub struct RemoteProcessSpawner {
468 channel: Arc<AgentChannel>,
469 env: Arc<EnvProvider>,
470 trust: Arc<WorkspaceTrust>,
471}
472
473impl RemoteProcessSpawner {
474 pub fn new(
477 channel: Arc<AgentChannel>,
478 env: Arc<EnvProvider>,
479 trust: Arc<WorkspaceTrust>,
480 ) -> Self {
481 Self {
482 channel,
483 env,
484 trust,
485 }
486 }
487
488 async fn captured_env(&self) -> Vec<(String, String)> {
491 let channel = self.channel.clone();
492 self.env
493 .current(move |script| async move {
494 let params = exec_params("sh", &["-lc".to_string(), script], None);
495 let (mut data_rx, _result) =
496 channel.request_streaming("exec", params).await.ok()?;
497 let mut stdout = Vec::new();
498 while let Some(d) = data_rx.recv().await {
499 if let Some(out) = d.get("out").and_then(|v| v.as_str()) {
500 if let Ok(b) = decode_base64(out) {
501 stdout.extend_from_slice(&b);
502 }
503 }
504 }
505 Some(String::from_utf8_lossy(&stdout).into_owned())
506 })
507 .await
508 }
509}
510
511#[async_trait::async_trait]
512impl ProcessSpawner for RemoteProcessSpawner {
513 async fn spawn(
514 &self,
515 command: String,
516 args: Vec<String>,
517 cwd: Option<String>,
518 ) -> Result<SpawnResult, SpawnError> {
519 gate(&self.trust, &command, cwd.as_deref())?;
520 let captured = self.captured_env().await;
521 let (eff_cmd, eff_args) = env_wrap(&captured, &command, &args);
522 let params = exec_params(&eff_cmd, &eff_args, cwd.as_deref());
523
524 let (mut data_rx, result_rx) = self.channel.request_streaming("exec", params).await?;
526
527 let mut stdout = Vec::new();
528 let mut stderr = Vec::new();
529
530 while let Some(data) = data_rx.recv().await {
532 if let Some(out) = data.get("out").and_then(|v| v.as_str()) {
533 if let Ok(decoded) = decode_base64(out) {
534 stdout.extend_from_slice(&decoded);
535 }
536 }
537 if let Some(err) = data.get("err").and_then(|v| v.as_str()) {
538 if let Ok(decoded) = decode_base64(err) {
539 stderr.extend_from_slice(&decoded);
540 }
541 }
542 }
543
544 let result = result_rx
546 .await
547 .map_err(|_| SpawnError::Channel(ChannelError::ChannelClosed))?
548 .map_err(SpawnError::Process)?;
549
550 let exit_code = result
551 .get("code")
552 .and_then(|v| v.as_i64())
553 .map(|c| c as i32)
554 .unwrap_or(-1);
555
556 Ok(SpawnResult {
557 stdout: String::from_utf8_lossy(&stdout).to_string(),
558 stderr: String::from_utf8_lossy(&stderr).to_string(),
559 exit_code,
560 })
561 }
562
563 async fn spawn_to_file(
564 &self,
565 _command: String,
566 _args: Vec<String>,
567 _cwd: Option<String>,
568 _stdout_to: std::path::PathBuf,
569 ) -> Result<SpawnResult, SpawnError> {
570 Err(SpawnError::Process(
571 "stdoutTo is not supported for remote processes".to_string(),
572 ))
573 }
574}
575
576pub struct StdioChild {
596 inner: tokio::process::Child,
597 stdin: Option<ChildStdin>,
598 stdout: Option<ChildStdout>,
599 stderr: Option<ChildStderr>,
600 spawned_locally: bool,
601}
602
603impl StdioChild {
604 pub fn from_tokio_child(mut child: tokio::process::Child, spawned_locally: bool) -> Self {
615 let stdin = child.stdin.take();
616 let stdout = child.stdout.take();
617 let stderr = child.stderr.take();
618 Self {
619 inner: child,
620 stdin,
621 stdout,
622 stderr,
623 spawned_locally,
624 }
625 }
626
627 pub fn from_local_tokio_child(
633 child: tokio::process::Child,
634 post_spawn: PostSpawnAction,
635 ) -> Self {
636 let out = Self::from_tokio_child(child, true);
637 if let Some(pid) = out.inner.id() {
638 post_spawn.apply_to_child(pid);
639 }
640 out
641 }
642
643 pub fn take_stdin(&mut self) -> Option<ChildStdin> {
645 self.stdin.take()
646 }
647
648 pub fn take_stdout(&mut self) -> Option<ChildStdout> {
650 self.stdout.take()
651 }
652
653 pub fn take_stderr(&mut self) -> Option<ChildStderr> {
655 self.stderr.take()
656 }
657
658 pub fn id(&self) -> Option<u32> {
662 self.inner.id()
663 }
664
665 pub fn spawned_locally(&self) -> bool {
669 self.spawned_locally
670 }
671
672 pub async fn kill(&mut self) -> std::io::Result<()> {
674 self.inner.kill().await
675 }
676
677 pub async fn wait(&mut self) -> std::io::Result<ExitStatus> {
679 self.inner.wait().await
680 }
681}
682
683#[async_trait::async_trait]
702pub trait LongRunningSpawner: Send + Sync {
703 async fn spawn_stdio(
709 &self,
710 command: &str,
711 args: &[String],
712 env: Vec<(String, String)>,
713 cwd: Option<&Path>,
714 limits: Option<&ProcessLimits>,
715 ) -> Result<StdioChild, SpawnError>;
716
717 async fn command_exists(&self, command: &str) -> bool;
723}
724
725pub struct LocalLongRunningSpawner {
733 env: Arc<EnvProvider>,
734 trust: Arc<WorkspaceTrust>,
735}
736
737impl LocalLongRunningSpawner {
738 pub fn new(env: Arc<EnvProvider>, trust: Arc<WorkspaceTrust>) -> Self {
741 Self { env, trust }
742 }
743}
744
745#[async_trait::async_trait]
746impl LongRunningSpawner for LocalLongRunningSpawner {
747 async fn spawn_stdio(
748 &self,
749 command: &str,
750 args: &[String],
751 env: Vec<(String, String)>,
752 cwd: Option<&Path>,
753 limits: Option<&ProcessLimits>,
754 ) -> Result<StdioChild, SpawnError> {
755 gate(
756 &self.trust,
757 command,
758 cwd.map(|p| p.to_string_lossy()).as_deref(),
759 )?;
760 let captured = local_captured_env(&self.env).await;
761 let mut cmd = tokio::process::Command::new(resolve_program(command).as_ref());
762 cmd.args(args)
763 .envs(captured.iter().map(|(k, v)| (k.as_str(), v.as_str())))
765 .envs(env)
766 .stdin(std::process::Stdio::piped())
767 .stdout(std::process::Stdio::piped())
768 .stderr(std::process::Stdio::piped())
769 .hide_window()
770 .kill_on_drop(true);
771 if let Some(dir) = cwd {
772 cmd.current_dir(dir);
773 }
774
775 let post_spawn = match limits {
780 Some(lim) => lim
781 .apply_to_command(&mut cmd)
782 .map_err(|e| SpawnError::Process(format!("Failed to apply process limits: {e}")))?,
783 None => PostSpawnAction::default(),
784 };
785
786 let child = cmd
787 .spawn()
788 .map_err(|e| SpawnError::Process(e.to_string()))?;
789 Ok(StdioChild::from_local_tokio_child(child, post_spawn))
790 }
791
792 async fn command_exists(&self, command: &str) -> bool {
793 let captured = local_captured_env(&self.env).await;
798 if let Some((_, path)) = captured.iter().find(|(k, _)| k == "PATH") {
799 let cwd = std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."));
800 return which::which_in(command, Some(path), &cwd).is_ok();
801 }
802 which::which(command).is_ok()
803 }
804}
805
806fn shell_quote(s: &str) -> String {
809 let mut out = String::with_capacity(s.len() + 2);
810 out.push('\'');
811 for c in s.chars() {
812 if c == '\'' {
813 out.push_str("'\\''");
814 } else {
815 out.push(c);
816 }
817 }
818 out.push('\'');
819 out
820}
821
822fn build_remote_exec(
828 env: &[(String, String)],
829 cwd: Option<&str>,
830 command: &str,
831 args: &[String],
832) -> String {
833 let mut s = String::new();
834 if let Some(dir) = cwd {
835 s.push_str("cd ");
836 s.push_str(&shell_quote(dir));
837 s.push_str(" && ");
838 }
839 s.push_str("exec ");
840 if !env.is_empty() {
841 s.push_str("env ");
842 for (k, v) in env {
843 s.push_str(k);
844 s.push('=');
845 s.push_str(&shell_quote(v));
846 s.push(' ');
847 }
848 }
849 s.push_str(&shell_quote(command));
850 for a in args {
851 s.push(' ');
852 s.push_str(&shell_quote(a));
853 }
854 s
855}
856
857fn build_remote_command_exists(env: &[(String, String)], command: &str) -> String {
861 let mut s = String::new();
862 for (k, v) in env {
863 s.push_str("export ");
864 s.push_str(k);
865 s.push('=');
866 s.push_str(&shell_quote(v));
867 s.push_str("; ");
868 }
869 s.push_str("command -v ");
870 s.push_str(&shell_quote(command));
871 s.push_str(" >/dev/null 2>&1");
872 s
873}
874
875fn build_ssh_args(
880 params: &crate::services::remote::ConnectionParams,
881 remote_cmd: &str,
882) -> Vec<String> {
883 let mut a = vec![
884 "-o".to_string(),
885 "StrictHostKeyChecking=accept-new".to_string(),
886 "-o".to_string(),
887 "BatchMode=yes".to_string(),
888 ];
889 if let Some(port) = params.port {
890 a.push("-p".to_string());
891 a.push(port.to_string());
892 }
893 if let Some(ref identity) = params.identity_file {
894 a.push("-i".to_string());
895 a.push(identity.to_string_lossy().into_owned());
896 }
897 a.extend(params.extra_args.iter().cloned());
898 a.push(params.ssh_target());
899 a.push(remote_cmd.to_string());
900 a
901}
902
903pub fn build_ssh_terminal_args(
918 params: &crate::services::remote::ConnectionParams,
919 remote_dir: Option<&str>,
920) -> Vec<String> {
921 let mut a = vec![
922 "-t".to_string(),
923 "-o".to_string(),
924 "StrictHostKeyChecking=accept-new".to_string(),
925 ];
926 if let Some(port) = params.port {
927 a.push("-p".to_string());
928 a.push(port.to_string());
929 }
930 if let Some(ref identity) = params.identity_file {
931 a.push("-i".to_string());
932 a.push(identity.to_string_lossy().into_owned());
933 }
934 a.extend(params.extra_args.iter().cloned());
935 a.push(params.ssh_target());
936
937 let mut remote_cmd = String::new();
944 if let Some(dir) = remote_dir.filter(|d| !d.is_empty()) {
945 let quoted = shell_quote(dir);
946 remote_cmd.push_str(&format!(
947 "d={quoted}; [ -d \"$d\" ] || d=$(dirname \"$d\"); cd \"$d\" 2>/dev/null; "
948 ));
949 }
950 remote_cmd.push_str(SSH_EXEC_LOGIN_SHELL);
951 a.push(remote_cmd);
952 a
953}
954
955pub const SSH_EXEC_LOGIN_SHELL: &str = "exec ${SHELL:-/bin/sh} -l";
960
961pub fn ssh_remote_env_launcher(recipe: &str) -> String {
980 use base64::{engine::general_purpose::STANDARD as BASE64, Engine};
981
982 let recipe_json = serde_json::to_string(recipe).unwrap_or_else(|_| "\"\"".to_string());
983 let launcher_src = format!(
987 r#"import os,subprocess
988_r={recipe_json}
989_S="{sentinel}"
990_script="command env; printf '%s\\n' '"+_S+"'; "+_r+"; command env"
991try:
992 _o=subprocess.run(["bash","-lc",_script],stdout=subprocess.PIPE,stderr=subprocess.DEVNULL).stdout.decode("utf-8","replace")
993except Exception:
994 _o=""
995def _p(t):
996 d={{}}
997 for ln in t.splitlines():
998 i=ln.find("=")
999 if i>0: d[ln[:i]]=ln[i+1:]
1000 return d
1001if _S in _o:
1002 _b,_a=_o.split(_S,1)
1003 _bb=_p(_b); _aa=_p(_a)
1004 for k,v in _aa.items():
1005 if _bb.get(k)!=v: os.environ[k]=v
1006 for k in list(_bb):
1007 if k not in _aa: os.environ.pop(k,None)
1008_sh=os.environ.get("SHELL") or "/bin/sh"
1009os.execvp(_sh,[_sh,"-l"])
1010"#,
1011 sentinel = crate::services::env_provider::DELTA_SENTINEL,
1012 );
1013
1014 let b64 = BASE64.encode(launcher_src.as_bytes());
1015 format!("exec python3 -c 'import base64;exec(base64.b64decode(\"{b64}\").decode())'")
1016}
1017
1018pub fn build_kube_terminal_args(
1032 target: &crate::services::remote::KubeTarget,
1033 base_env: &[(String, String)],
1034) -> Vec<String> {
1035 let mut remote_cmd = String::new();
1036 for (k, v) in base_env {
1046 remote_cmd.push_str(&format!("export {}={}; ", k, shell_quote(v)));
1047 }
1048 if let Some(dir) = target.workspace.as_deref().filter(|d| !d.is_empty()) {
1049 let quoted = shell_quote(dir);
1050 remote_cmd.push_str(&format!(
1051 "d={quoted}; [ -d \"$d\" ] || d=$(dirname \"$d\"); cd \"$d\" 2>/dev/null; "
1052 ));
1053 }
1054 remote_cmd.push_str("exec ${SHELL:-/bin/sh} -l");
1055 crate::services::remote::transport::kubectl_exec_argv(
1056 target,
1057 &["-it"],
1058 "sh",
1059 &["-lc".to_string(), remote_cmd],
1060 )
1061}
1062
1063pub struct RemoteLongRunningSpawner {
1077 params: crate::services::remote::ConnectionParams,
1078 env: Arc<EnvProvider>,
1079 trust: Arc<WorkspaceTrust>,
1080}
1081
1082impl RemoteLongRunningSpawner {
1083 pub fn new(
1086 params: crate::services::remote::ConnectionParams,
1087 env: Arc<EnvProvider>,
1088 trust: Arc<WorkspaceTrust>,
1089 ) -> Self {
1090 Self { params, env, trust }
1091 }
1092
1093 async fn captured_env(&self) -> Vec<(String, String)> {
1096 let params = self.params.clone();
1097 self.env
1098 .current(move |script| async move {
1099 let ssh_args = build_ssh_args(¶ms, &script);
1100 let output = tokio::process::Command::new("ssh")
1101 .args(&ssh_args)
1102 .hide_window()
1103 .output()
1104 .await
1105 .ok()?;
1106 Some(String::from_utf8_lossy(&output.stdout).into_owned())
1107 })
1108 .await
1109 }
1110}
1111
1112#[async_trait::async_trait]
1113impl LongRunningSpawner for RemoteLongRunningSpawner {
1114 async fn spawn_stdio(
1115 &self,
1116 command: &str,
1117 args: &[String],
1118 env: Vec<(String, String)>,
1119 cwd: Option<&Path>,
1120 _limits: Option<&ProcessLimits>,
1121 ) -> Result<StdioChild, SpawnError> {
1122 let cwd_str = cwd.map(|p| p.to_string_lossy().into_owned());
1125 gate(&self.trust, command, cwd_str.as_deref())?;
1126
1127 let mut merged = self.captured_env().await;
1130 merged.extend(env);
1131
1132 let remote = build_remote_exec(&merged, cwd_str.as_deref(), command, args);
1133 let ssh_args = build_ssh_args(&self.params, &remote);
1134
1135 let mut cmd = tokio::process::Command::new("ssh");
1136 cmd.args(&ssh_args)
1137 .stdin(std::process::Stdio::piped())
1138 .stdout(std::process::Stdio::piped())
1139 .stderr(std::process::Stdio::piped())
1140 .hide_window()
1141 .kill_on_drop(true);
1142
1143 let child = cmd
1144 .spawn()
1145 .map_err(|e| SpawnError::Process(e.to_string()))?;
1146 Ok(StdioChild::from_tokio_child(child, false))
1149 }
1150
1151 async fn command_exists(&self, command: &str) -> bool {
1152 let captured = self.captured_env().await;
1153 let remote = build_remote_command_exists(&captured, command);
1154 let ssh_args = build_ssh_args(&self.params, &remote);
1155 match tokio::process::Command::new("ssh")
1156 .args(&ssh_args)
1157 .hide_window()
1158 .output()
1159 .await
1160 {
1161 Ok(output) => output.status.success(),
1162 Err(_) => false,
1163 }
1164 }
1165}
1166
1167#[cfg(test)]
1168mod tests {
1169 use super::*;
1170 use tokio::io::AsyncReadExt;
1171
1172 #[cfg(not(windows))]
1176 #[test]
1177 fn resolve_program_is_passthrough_on_unix() {
1178 assert_eq!(
1179 resolve_program("typescript-language-server"),
1180 "typescript-language-server"
1181 );
1182 assert_eq!(resolve_program("sh"), "sh");
1183 assert_eq!(resolve_program(""), "");
1184 }
1185
1186 #[cfg(windows)]
1190 #[test]
1191 fn resolve_program_falls_back_and_resolves_on_windows() {
1192 assert_eq!(
1193 resolve_program("fresh-unlikely-binary-name-ygzu9"),
1194 "fresh-unlikely-binary-name-ygzu9"
1195 );
1196 let resolved = resolve_program("cmd");
1198 assert!(
1199 std::path::Path::new(resolved.as_ref()).is_absolute(),
1200 "expected an absolute path, got {resolved:?}"
1201 );
1202 }
1203
1204 #[tokio::test]
1205 async fn test_local_spawner() {
1206 let spawner = LocalProcessSpawner::new(
1207 Arc::new(EnvProvider::inactive()),
1208 Arc::new(WorkspaceTrust::permissive()),
1209 );
1210 let result = spawner
1211 .spawn("echo".to_string(), vec!["hello".to_string()], None)
1212 .await
1213 .unwrap();
1214
1215 assert_eq!(result.exit_code, 0);
1216 assert!(result.stdout.trim() == "hello");
1217 }
1218
1219 #[tokio::test]
1220 async fn test_local_spawner_stdout_to_file() {
1221 let spawner = LocalProcessSpawner::new(
1222 Arc::new(EnvProvider::inactive()),
1223 Arc::new(WorkspaceTrust::permissive()),
1224 );
1225 let tmp =
1226 std::env::temp_dir().join(format!("fresh-spawner-test-{}.out", std::process::id()));
1227 #[allow(clippy::let_underscore_must_use)]
1231 let _ = std::fs::remove_file(&tmp);
1232 let result = spawner
1233 .spawn_to_file(
1234 "echo".to_string(),
1235 vec!["hello-from-disk".to_string()],
1236 None,
1237 tmp.clone(),
1238 )
1239 .await
1240 .unwrap();
1241
1242 assert_eq!(result.exit_code, 0);
1243 assert!(
1244 result.stdout.is_empty(),
1245 "stdout should be empty when streaming"
1246 );
1247 let contents = std::fs::read_to_string(&tmp).expect("output file should exist");
1248 assert_eq!(contents.trim(), "hello-from-disk");
1249 #[allow(clippy::let_underscore_must_use)]
1253 let _ = std::fs::remove_file(&tmp);
1254 }
1255
1256 #[tokio::test]
1257 async fn test_local_spawner_cancellable_kill() {
1258 let spawner = LocalProcessSpawner::new(
1259 Arc::new(EnvProvider::inactive()),
1260 Arc::new(WorkspaceTrust::permissive()),
1261 );
1262 let (kill_tx, kill_rx) = tokio::sync::oneshot::channel::<()>();
1263
1264 let task = tokio::spawn(async move {
1266 spawner
1267 .spawn_cancellable(
1268 "sleep".to_string(),
1269 vec!["30".to_string()],
1270 None,
1271 None,
1272 kill_rx,
1273 )
1274 .await
1275 });
1276
1277 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1278 #[allow(clippy::let_underscore_must_use)]
1284 let _ = kill_tx.send(());
1285
1286 let start = std::time::Instant::now();
1287 let result = task.await.unwrap().unwrap();
1288 let elapsed = start.elapsed();
1289
1290 assert!(
1293 elapsed < std::time::Duration::from_secs(5),
1294 "kill should be prompt, took {:?}",
1295 elapsed
1296 );
1297 assert_ne!(result.exit_code, 0, "killed process shouldn't be exit 0");
1298 }
1299
1300 #[tokio::test]
1301 async fn local_long_running_spawn_stdio_pipes_output() {
1302 let spawner = LocalLongRunningSpawner::new(
1303 Arc::new(EnvProvider::inactive()),
1304 Arc::new(WorkspaceTrust::permissive()),
1305 );
1306 let mut child = spawner
1307 .spawn_stdio(
1308 "sh",
1309 &["-c".into(), "echo hi".into()],
1310 Vec::new(),
1311 None,
1312 None,
1313 )
1314 .await
1315 .expect("spawn succeeds");
1316
1317 let mut stdout = child.take_stdout().expect("stdout piped");
1318 let mut buf = String::new();
1319 stdout.read_to_string(&mut buf).await.unwrap();
1320 assert_eq!(buf.trim(), "hi");
1321
1322 let status = child.wait().await.unwrap();
1323 assert!(status.success());
1324 assert!(child.spawned_locally());
1325 }
1326
1327 #[tokio::test]
1328 async fn local_long_running_command_exists_for_sh() {
1329 let spawner = LocalLongRunningSpawner::new(
1330 Arc::new(EnvProvider::inactive()),
1331 Arc::new(WorkspaceTrust::permissive()),
1332 );
1333 assert!(spawner.command_exists("sh").await);
1334 assert!(
1335 !spawner
1336 .command_exists("fresh-unlikely-binary-name-ygzu9")
1337 .await
1338 );
1339 }
1340
1341 #[cfg(unix)]
1345 #[tokio::test]
1346 async fn local_spawner_applies_active_env_provider() {
1347 let env = Arc::new(EnvProvider::inactive());
1350 env.set("export FRESH_ENV_TEST=hi-from-provider".into(), None);
1351 let spawner = LocalProcessSpawner::new(env, Arc::new(WorkspaceTrust::permissive()));
1352 let result = spawner
1353 .spawn(
1354 "sh".into(),
1355 vec!["-c".into(), "printf %s \"$FRESH_ENV_TEST\"".into()],
1356 None,
1357 )
1358 .await
1359 .unwrap();
1360 assert_eq!(result.exit_code, 0);
1361 assert_eq!(result.stdout, "hi-from-provider");
1362 }
1363
1364 #[tokio::test]
1365 async fn local_spawner_inactive_provider_injects_nothing() {
1366 let spawner = LocalProcessSpawner::new(
1367 Arc::new(EnvProvider::inactive()),
1368 Arc::new(WorkspaceTrust::permissive()),
1369 );
1370 let result = spawner
1371 .spawn(
1372 "sh".into(),
1373 vec!["-c".into(), "printf %s \"${FRESH_ENV_TEST:-unset}\"".into()],
1374 None,
1375 )
1376 .await
1377 .unwrap();
1378 assert_eq!(result.stdout, "unset");
1379 }
1380
1381 #[test]
1384 fn shell_quote_wraps_and_escapes() {
1385 assert_eq!(shell_quote("abc"), "'abc'");
1386 assert_eq!(shell_quote("a b/c"), "'a b/c'");
1387 assert_eq!(shell_quote("a'b"), "'a'\\''b'");
1388 }
1389
1390 #[test]
1391 fn build_remote_exec_with_cwd_and_env() {
1392 let env = vec![("VIRTUAL_ENV".to_string(), "/proj/.venv".to_string())];
1393 let s = build_remote_exec(&env, Some("/proj dir"), "python", &["x.py".to_string()]);
1394 assert_eq!(
1395 s,
1396 "cd '/proj dir' && exec env VIRTUAL_ENV='/proj/.venv' 'python' 'x.py'"
1397 );
1398 }
1399
1400 #[test]
1401 fn build_remote_exec_minimal() {
1402 assert_eq!(build_remote_exec(&[], None, "gopls", &[]), "exec 'gopls'");
1403 }
1404
1405 #[test]
1406 fn build_remote_command_exists_exports_env() {
1407 let env = vec![("PATH".to_string(), "/proj/.venv/bin:/usr/bin".to_string())];
1408 assert_eq!(
1409 build_remote_command_exists(&env, "pyright"),
1410 "export PATH='/proj/.venv/bin:/usr/bin'; command -v 'pyright' >/dev/null 2>&1"
1411 );
1412 }
1413
1414 #[test]
1415 fn build_ssh_args_full() {
1416 let params = crate::services::remote::ConnectionParams {
1417 user: Some("u".into()),
1418 host: "h".into(),
1419 port: Some(2222),
1420 identity_file: Some(std::path::PathBuf::from("/k")),
1421 extra_args: Vec::new(),
1422 };
1423 let a = build_ssh_args(¶ms, "echo hi");
1424 let expected: Vec<String> = [
1425 "-o",
1426 "StrictHostKeyChecking=accept-new",
1427 "-o",
1428 "BatchMode=yes",
1429 "-p",
1430 "2222",
1431 "-i",
1432 "/k",
1433 "u@h",
1434 "echo hi",
1435 ]
1436 .into_iter()
1437 .map(String::from)
1438 .collect();
1439 assert_eq!(a, expected);
1440 }
1441
1442 #[test]
1443 fn build_ssh_args_omits_user_and_threads_extra_args() {
1444 let params = crate::services::remote::ConnectionParams {
1446 user: None,
1447 host: "h".into(),
1448 port: None,
1449 identity_file: None,
1450 extra_args: vec!["-J".into(), "jump".into()],
1451 };
1452 let a = build_ssh_args(¶ms, "echo hi");
1453 let expected: Vec<String> = [
1454 "-o",
1455 "StrictHostKeyChecking=accept-new",
1456 "-o",
1457 "BatchMode=yes",
1458 "-J",
1459 "jump",
1460 "h",
1461 "echo hi",
1462 ]
1463 .into_iter()
1464 .map(String::from)
1465 .collect();
1466 assert_eq!(a, expected);
1467 }
1468
1469 #[test]
1470 fn build_ssh_terminal_args_forces_tty_and_login_shell() {
1471 let params = crate::services::remote::ConnectionParams {
1472 user: Some("u".into()),
1473 host: "h".into(),
1474 port: Some(2222),
1475 identity_file: Some(std::path::PathBuf::from("/k")),
1476 extra_args: Vec::new(),
1477 };
1478 let a = build_ssh_terminal_args(¶ms, Some("/proj dir"));
1479 let expected: Vec<String> = [
1480 "-t",
1481 "-o",
1482 "StrictHostKeyChecking=accept-new",
1483 "-p",
1484 "2222",
1485 "-i",
1486 "/k",
1487 "u@h",
1488 "d='/proj dir'; [ -d \"$d\" ] || d=$(dirname \"$d\"); cd \"$d\" 2>/dev/null; exec ${SHELL:-/bin/sh} -l",
1489 ]
1490 .into_iter()
1491 .map(String::from)
1492 .collect();
1493 assert_eq!(a, expected);
1494 assert!(!a.iter().any(|s| s == "BatchMode=yes"));
1496 assert!(a.last().unwrap().ends_with(SSH_EXEC_LOGIN_SHELL));
1498 }
1499
1500 #[test]
1501 fn ssh_remote_env_launcher_is_a_safe_single_quoted_python_oneliner() {
1502 use base64::{engine::general_purpose::STANDARD as BASE64, Engine};
1503
1504 let recipe = "eval \"$(direnv export bash)\"";
1505 let launcher = ssh_remote_env_launcher(recipe);
1506
1507 assert!(launcher.starts_with("exec python3 -c '"));
1509 assert!(launcher.ends_with('\''));
1510 let inner = launcher
1514 .trim_start_matches("exec python3 -c '")
1515 .trim_end_matches('\'');
1516 assert!(
1517 !inner.contains('\''),
1518 "inner literal must not contain a single quote"
1519 );
1520
1521 let b64 = inner
1524 .trim_start_matches("import base64;exec(base64.b64decode(\"")
1525 .trim_end_matches("\").decode())");
1526 let src = String::from_utf8(BASE64.decode(b64).unwrap()).unwrap();
1527 assert!(
1528 src.contains("direnv export bash"),
1529 "recipe must be embedded"
1530 );
1531 assert!(src.contains(crate::services::env_provider::DELTA_SENTINEL));
1532 assert!(src.contains("os.execvp"));
1533 }
1534
1535 #[test]
1536 fn ssh_launcher_embeds_recipes_with_quotes_safely() {
1537 let recipe = "export X='a b'; source ./.venv/bin/activate";
1539 let launcher = ssh_remote_env_launcher(recipe);
1540 let inner = launcher
1541 .trim_start_matches("exec python3 -c '")
1542 .trim_end_matches('\'');
1543 assert!(
1544 !inner.contains('\''),
1545 "recipe quotes must be base64-encapsulated, never leak into the literal"
1546 );
1547 }
1548
1549 #[test]
1550 fn build_ssh_terminal_args_without_dir_skips_cd() {
1551 let params = crate::services::remote::ConnectionParams {
1552 user: Some("u".into()),
1553 host: "h".into(),
1554 port: None,
1555 identity_file: None,
1556 extra_args: Vec::new(),
1557 };
1558 let a = build_ssh_terminal_args(¶ms, None);
1559 assert_eq!(
1560 a,
1561 vec![
1562 "-t",
1563 "-o",
1564 "StrictHostKeyChecking=accept-new",
1565 "u@h",
1566 "exec ${SHELL:-/bin/sh} -l",
1567 ]
1568 );
1569 assert_eq!(build_ssh_terminal_args(¶ms, Some("")), a);
1571 }
1572
1573 #[test]
1574 fn build_kube_terminal_args_allocates_tty_and_pins_cwd() {
1575 let target = crate::services::remote::KubeTarget {
1576 context: Some("prod".into()),
1577 namespace: "dev".into(),
1578 pod: "pod-1".into(),
1579 container: Some("app".into()),
1580 workspace: Some("/workspace".into()),
1581 };
1582 let a = build_kube_terminal_args(&target, &[]);
1583 let expected: Vec<String> = [
1584 "--context",
1585 "prod",
1586 "exec",
1587 "-it",
1588 "-n",
1589 "dev",
1590 "-c",
1591 "app",
1592 "pod-1",
1593 "--",
1594 "sh",
1595 "-lc",
1596 "d='/workspace'; [ -d \"$d\" ] || d=$(dirname \"$d\"); cd \"$d\" 2>/dev/null; exec ${SHELL:-/bin/sh} -l",
1597 ]
1598 .into_iter()
1599 .map(String::from)
1600 .collect();
1601 assert_eq!(a, expected);
1602 }
1603
1604 #[test]
1605 fn build_kube_terminal_args_exports_base_env_before_login_shell() {
1606 let target = crate::services::remote::KubeTarget {
1607 context: None,
1608 namespace: "dev".into(),
1609 pod: "pod-1".into(),
1610 container: None,
1611 workspace: None,
1612 };
1613 let base_env = vec![
1614 ("VIRTUAL_ENV".to_string(), "/c/.venv".to_string()),
1615 ("MSG".to_string(), "a b".to_string()),
1616 ];
1617 let a = build_kube_terminal_args(&target, &base_env);
1618 assert_eq!(
1621 a.last().unwrap(),
1622 "export VIRTUAL_ENV='/c/.venv'; export MSG='a b'; exec ${SHELL:-/bin/sh} -l"
1623 );
1624 }
1625
1626 #[test]
1627 fn build_kube_terminal_args_without_workspace_skips_cd() {
1628 let target = crate::services::remote::KubeTarget {
1629 context: None,
1630 namespace: "dev".into(),
1631 pod: "pod-1".into(),
1632 container: None,
1633 workspace: None,
1634 };
1635 let a = build_kube_terminal_args(&target, &[]);
1636 assert_eq!(
1637 a,
1638 vec![
1639 "exec",
1640 "-it",
1641 "-n",
1642 "dev",
1643 "pod-1",
1644 "--",
1645 "sh",
1646 "-lc",
1647 "exec ${SHELL:-/bin/sh} -l",
1648 ]
1649 );
1650 }
1651}