use crate::services::env_provider::EnvProvider;
use crate::services::process_hidden::HideWindow;
use crate::services::process_limits::PostSpawnAction;
use crate::services::remote::channel::{AgentChannel, ChannelError};
use crate::services::remote::protocol::{decode_base64, exec_params};
use crate::services::workspace_trust::{gate, WorkspaceTrust};
use crate::types::ProcessLimits;
async fn local_captured_env(provider: &EnvProvider) -> Vec<(String, String)> {
provider
.current(|script| async move {
let shell = std::env::var("SHELL").unwrap_or_else(|_| "/bin/sh".to_string());
let output = tokio::process::Command::new(&shell)
.arg("-lc")
.arg(&script)
.hide_window()
.output()
.await
.ok()?;
Some(String::from_utf8_lossy(&output.stdout).into_owned())
})
.await
}
use std::borrow::Cow;
use std::path::Path;
use std::process::ExitStatus;
use std::sync::Arc;
use tokio::process::{ChildStderr, ChildStdin, ChildStdout};
#[cfg(windows)]
fn resolve_program(command: &str) -> Cow<'_, str> {
match which::which(command) {
Ok(path) => Cow::Owned(path.to_string_lossy().into_owned()),
Err(_) => Cow::Borrowed(command),
}
}
#[cfg(not(windows))]
fn resolve_program(command: &str) -> Cow<'_, str> {
Cow::Borrowed(command)
}
#[derive(Debug, Clone)]
pub struct SpawnResult {
pub stdout: String,
pub stderr: String,
pub exit_code: i32,
}
#[derive(Debug, thiserror::Error)]
pub enum SpawnError {
#[error("Channel error: {0}")]
Channel(#[from] ChannelError),
#[error("Process error: {0}")]
Process(String),
#[error("Decode error: {0}")]
Decode(String),
}
#[async_trait::async_trait]
pub trait ProcessSpawner: Send + Sync {
async fn spawn(
&self,
command: String,
args: Vec<String>,
cwd: Option<String>,
) -> Result<SpawnResult, SpawnError>;
async fn spawn_to_file(
&self,
command: String,
args: Vec<String>,
cwd: Option<String>,
stdout_to: std::path::PathBuf,
) -> Result<SpawnResult, SpawnError> {
let result = self.spawn(command, args, cwd).await?;
if result.exit_code == 0 || !result.stdout.is_empty() {
std::fs::write(&stdout_to, result.stdout.as_bytes())
.map_err(|e| SpawnError::Process(format!("write {:?}: {}", stdout_to, e)))?;
}
Ok(SpawnResult {
stdout: String::new(),
stderr: result.stderr,
exit_code: result.exit_code,
})
}
async fn spawn_cancellable(
&self,
command: String,
args: Vec<String>,
cwd: Option<String>,
stdout_to: Option<std::path::PathBuf>,
_kill_rx: tokio::sync::oneshot::Receiver<()>,
) -> Result<SpawnResult, SpawnError> {
match stdout_to {
Some(p) => self.spawn_to_file(command, args, cwd, p).await,
None => self.spawn(command, args, cwd).await,
}
}
}
pub struct LocalProcessSpawner {
env: Arc<EnvProvider>,
trust: Arc<WorkspaceTrust>,
}
impl LocalProcessSpawner {
pub fn new(env: Arc<EnvProvider>, trust: Arc<WorkspaceTrust>) -> Self {
Self { env, trust }
}
async fn apply_env(&self, cmd: &mut tokio::process::Command) {
let env = local_captured_env(&self.env).await;
if !env.is_empty() {
cmd.envs(env.iter().map(|(k, v)| (k.as_str(), v.as_str())));
}
}
}
#[async_trait::async_trait]
impl ProcessSpawner for LocalProcessSpawner {
async fn spawn(
&self,
command: String,
args: Vec<String>,
cwd: Option<String>,
) -> Result<SpawnResult, SpawnError> {
gate(&self.trust, &command, cwd.as_deref())?;
let mut cmd = tokio::process::Command::new(resolve_program(&command).as_ref());
cmd.args(&args);
self.apply_env(&mut cmd).await;
cmd.hide_window();
if let Some(ref dir) = cwd {
cmd.current_dir(dir);
}
let output = cmd
.output()
.await
.map_err(|e| SpawnError::Process(e.to_string()))?;
Ok(SpawnResult {
stdout: String::from_utf8_lossy(&output.stdout).to_string(),
stderr: String::from_utf8_lossy(&output.stderr).to_string(),
exit_code: output.status.code().unwrap_or(-1),
})
}
async fn spawn_cancellable(
&self,
command: String,
args: Vec<String>,
cwd: Option<String>,
stdout_to: Option<std::path::PathBuf>,
kill_rx: tokio::sync::oneshot::Receiver<()>,
) -> Result<SpawnResult, SpawnError> {
use std::process::Stdio;
use tokio::io::AsyncReadExt;
gate(&self.trust, &command, cwd.as_deref())?;
let mut cmd = tokio::process::Command::new(resolve_program(&command).as_ref());
cmd.args(&args);
self.apply_env(&mut cmd).await;
cmd.hide_window();
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
if let Some(ref dir) = cwd {
cmd.current_dir(dir);
}
if let Some(ref path) = stdout_to {
if let Some(parent) = path.parent() {
if !parent.as_os_str().is_empty() {
tokio::fs::create_dir_all(parent).await.map_err(|e| {
SpawnError::Process(format!("create_dir_all {:?}: {}", parent, e))
})?;
}
}
}
let mut child = cmd
.spawn()
.map_err(|e| SpawnError::Process(e.to_string()))?;
let mut child_stdout = child
.stdout
.take()
.ok_or_else(|| SpawnError::Process("child stdout missing".to_string()))?;
let mut child_stderr = child
.stderr
.take()
.ok_or_else(|| SpawnError::Process("child stderr missing".to_string()))?;
let stdout_task: tokio::task::JoinHandle<std::io::Result<Vec<u8>>> = match stdout_to {
Some(path) => tokio::spawn(async move {
let mut file = tokio::fs::File::create(&path).await?;
tokio::io::copy(&mut child_stdout, &mut file).await?;
use tokio::io::AsyncWriteExt;
if let Err(e) = file.flush().await {
tracing::warn!("spawn_cancellable: file flush failed: {}", e);
}
if let Err(e) = file.sync_all().await {
tracing::warn!("spawn_cancellable: file sync_all failed: {}", e);
}
Ok(Vec::new())
}),
None => tokio::spawn(async move {
let mut buf = Vec::new();
child_stdout.read_to_end(&mut buf).await?;
Ok(buf)
}),
};
let stderr_task: tokio::task::JoinHandle<std::io::Result<Vec<u8>>> =
tokio::spawn(async move {
let mut buf = Vec::new();
child_stderr.read_to_end(&mut buf).await?;
Ok(buf)
});
let exit_code = tokio::select! {
status = child.wait() => status
.map(|s| s.code().unwrap_or(-1))
.unwrap_or(-1),
_ = kill_rx => {
if let Err(e) = child.start_kill() {
tracing::debug!("spawn_cancellable: start_kill (already exited?): {}", e);
}
child.wait().await.map(|s| s.code().unwrap_or(-1)).unwrap_or(-1)
}
};
let stdout_bytes = stdout_task
.await
.map_err(|e| SpawnError::Process(format!("stdout task: {}", e)))?
.map_err(|e| SpawnError::Process(format!("stdout drain: {}", e)))?;
let stderr_bytes = stderr_task
.await
.map_err(|e| SpawnError::Process(format!("stderr task: {}", e)))?
.map_err(|e| SpawnError::Process(format!("stderr drain: {}", e)))?;
Ok(SpawnResult {
stdout: String::from_utf8_lossy(&stdout_bytes).to_string(),
stderr: String::from_utf8_lossy(&stderr_bytes).to_string(),
exit_code,
})
}
async fn spawn_to_file(
&self,
command: String,
args: Vec<String>,
cwd: Option<String>,
stdout_to: std::path::PathBuf,
) -> Result<SpawnResult, SpawnError> {
use std::process::Stdio;
use tokio::io::AsyncWriteExt;
gate(&self.trust, &command, cwd.as_deref())?;
let mut cmd = tokio::process::Command::new(resolve_program(&command).as_ref());
cmd.args(&args);
self.apply_env(&mut cmd).await;
cmd.hide_window();
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
if let Some(ref dir) = cwd {
cmd.current_dir(dir);
}
if let Some(parent) = stdout_to.parent() {
if !parent.as_os_str().is_empty() {
tokio::fs::create_dir_all(parent).await.map_err(|e| {
SpawnError::Process(format!("create_dir_all {:?}: {}", parent, e))
})?;
}
}
let mut file = tokio::fs::File::create(&stdout_to)
.await
.map_err(|e| SpawnError::Process(format!("create {:?}: {}", stdout_to, e)))?;
let mut child = cmd
.spawn()
.map_err(|e| SpawnError::Process(e.to_string()))?;
let mut child_stdout = child
.stdout
.take()
.ok_or_else(|| SpawnError::Process("child stdout missing".to_string()))?;
let mut child_stderr = child
.stderr
.take()
.ok_or_else(|| SpawnError::Process("child stderr missing".to_string()))?;
let stdout_task = tokio::spawn(async move {
let res = tokio::io::copy(&mut child_stdout, &mut file).await;
if let Err(e) = file.flush().await {
tracing::warn!("spawn_to_file: file flush failed: {}", e);
}
if let Err(e) = file.sync_all().await {
tracing::warn!("spawn_to_file: file sync_all failed: {}", e);
}
res
});
let stderr_task = tokio::spawn(async move {
let mut buf = Vec::new();
let res = tokio::io::copy(&mut child_stderr, &mut buf).await;
res.map(|_| buf)
});
let status = child
.wait()
.await
.map_err(|e| SpawnError::Process(format!("wait: {}", e)))?;
stdout_task
.await
.map_err(|e| SpawnError::Process(format!("stdout task: {}", e)))?
.map_err(|e| SpawnError::Process(format!("stdout copy: {}", e)))?;
let stderr_bytes = stderr_task
.await
.map_err(|e| SpawnError::Process(format!("stderr task: {}", e)))?
.map_err(|e| SpawnError::Process(format!("stderr drain: {}", e)))?;
Ok(SpawnResult {
stdout: String::new(),
stderr: String::from_utf8_lossy(&stderr_bytes).to_string(),
exit_code: status.code().unwrap_or(-1),
})
}
}
fn env_wrap(env: &[(String, String)], command: &str, args: &[String]) -> (String, Vec<String>) {
if env.is_empty() {
return (command.to_string(), args.to_vec());
}
let mut wrapped = Vec::with_capacity(env.len() + 1 + args.len());
for (k, v) in env {
wrapped.push(format!("{k}={v}"));
}
wrapped.push(command.to_string());
wrapped.extend(args.iter().cloned());
("env".to_string(), wrapped)
}
pub struct RemoteProcessSpawner {
channel: Arc<AgentChannel>,
env: Arc<EnvProvider>,
trust: Arc<WorkspaceTrust>,
}
impl RemoteProcessSpawner {
pub fn new(
channel: Arc<AgentChannel>,
env: Arc<EnvProvider>,
trust: Arc<WorkspaceTrust>,
) -> Self {
Self {
channel,
env,
trust,
}
}
async fn captured_env(&self) -> Vec<(String, String)> {
let channel = self.channel.clone();
self.env
.current(move |script| async move {
let params = exec_params("sh", &["-lc".to_string(), script], None);
let (mut data_rx, _result) =
channel.request_streaming("exec", params).await.ok()?;
let mut stdout = Vec::new();
while let Some(d) = data_rx.recv().await {
if let Some(out) = d.get("out").and_then(|v| v.as_str()) {
if let Ok(b) = decode_base64(out) {
stdout.extend_from_slice(&b);
}
}
}
Some(String::from_utf8_lossy(&stdout).into_owned())
})
.await
}
}
#[async_trait::async_trait]
impl ProcessSpawner for RemoteProcessSpawner {
async fn spawn(
&self,
command: String,
args: Vec<String>,
cwd: Option<String>,
) -> Result<SpawnResult, SpawnError> {
gate(&self.trust, &command, cwd.as_deref())?;
let captured = self.captured_env().await;
let (eff_cmd, eff_args) = env_wrap(&captured, &command, &args);
let params = exec_params(&eff_cmd, &eff_args, cwd.as_deref());
let (mut data_rx, result_rx) = self.channel.request_streaming("exec", params).await?;
let mut stdout = Vec::new();
let mut stderr = Vec::new();
while let Some(data) = data_rx.recv().await {
if let Some(out) = data.get("out").and_then(|v| v.as_str()) {
if let Ok(decoded) = decode_base64(out) {
stdout.extend_from_slice(&decoded);
}
}
if let Some(err) = data.get("err").and_then(|v| v.as_str()) {
if let Ok(decoded) = decode_base64(err) {
stderr.extend_from_slice(&decoded);
}
}
}
let result = result_rx
.await
.map_err(|_| SpawnError::Channel(ChannelError::ChannelClosed))?
.map_err(SpawnError::Process)?;
let exit_code = result
.get("code")
.and_then(|v| v.as_i64())
.map(|c| c as i32)
.unwrap_or(-1);
Ok(SpawnResult {
stdout: String::from_utf8_lossy(&stdout).to_string(),
stderr: String::from_utf8_lossy(&stderr).to_string(),
exit_code,
})
}
async fn spawn_to_file(
&self,
_command: String,
_args: Vec<String>,
_cwd: Option<String>,
_stdout_to: std::path::PathBuf,
) -> Result<SpawnResult, SpawnError> {
Err(SpawnError::Process(
"stdoutTo is not supported for remote processes".to_string(),
))
}
}
pub struct StdioChild {
inner: tokio::process::Child,
stdin: Option<ChildStdin>,
stdout: Option<ChildStdout>,
stderr: Option<ChildStderr>,
spawned_locally: bool,
}
impl StdioChild {
pub fn from_tokio_child(mut child: tokio::process::Child, spawned_locally: bool) -> Self {
let stdin = child.stdin.take();
let stdout = child.stdout.take();
let stderr = child.stderr.take();
Self {
inner: child,
stdin,
stdout,
stderr,
spawned_locally,
}
}
pub fn from_local_tokio_child(
child: tokio::process::Child,
post_spawn: PostSpawnAction,
) -> Self {
let out = Self::from_tokio_child(child, true);
if let Some(pid) = out.inner.id() {
post_spawn.apply_to_child(pid);
}
out
}
pub fn take_stdin(&mut self) -> Option<ChildStdin> {
self.stdin.take()
}
pub fn take_stdout(&mut self) -> Option<ChildStdout> {
self.stdout.take()
}
pub fn take_stderr(&mut self) -> Option<ChildStderr> {
self.stderr.take()
}
pub fn id(&self) -> Option<u32> {
self.inner.id()
}
pub fn spawned_locally(&self) -> bool {
self.spawned_locally
}
pub async fn kill(&mut self) -> std::io::Result<()> {
self.inner.kill().await
}
pub async fn wait(&mut self) -> std::io::Result<ExitStatus> {
self.inner.wait().await
}
}
#[async_trait::async_trait]
pub trait LongRunningSpawner: Send + Sync {
async fn spawn_stdio(
&self,
command: &str,
args: &[String],
env: Vec<(String, String)>,
cwd: Option<&Path>,
limits: Option<&ProcessLimits>,
) -> Result<StdioChild, SpawnError>;
async fn command_exists(&self, command: &str) -> bool;
}
pub struct LocalLongRunningSpawner {
env: Arc<EnvProvider>,
trust: Arc<WorkspaceTrust>,
}
impl LocalLongRunningSpawner {
pub fn new(env: Arc<EnvProvider>, trust: Arc<WorkspaceTrust>) -> Self {
Self { env, trust }
}
}
#[async_trait::async_trait]
impl LongRunningSpawner for LocalLongRunningSpawner {
async fn spawn_stdio(
&self,
command: &str,
args: &[String],
env: Vec<(String, String)>,
cwd: Option<&Path>,
limits: Option<&ProcessLimits>,
) -> Result<StdioChild, SpawnError> {
gate(
&self.trust,
command,
cwd.map(|p| p.to_string_lossy()).as_deref(),
)?;
let captured = local_captured_env(&self.env).await;
let mut cmd = tokio::process::Command::new(resolve_program(command).as_ref());
cmd.args(args)
.envs(captured.iter().map(|(k, v)| (k.as_str(), v.as_str())))
.envs(env)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.hide_window()
.kill_on_drop(true);
if let Some(dir) = cwd {
cmd.current_dir(dir);
}
let post_spawn = match limits {
Some(lim) => lim
.apply_to_command(&mut cmd)
.map_err(|e| SpawnError::Process(format!("Failed to apply process limits: {e}")))?,
None => PostSpawnAction::default(),
};
let child = cmd
.spawn()
.map_err(|e| SpawnError::Process(e.to_string()))?;
Ok(StdioChild::from_local_tokio_child(child, post_spawn))
}
async fn command_exists(&self, command: &str) -> bool {
let captured = local_captured_env(&self.env).await;
if let Some((_, path)) = captured.iter().find(|(k, _)| k == "PATH") {
let cwd = std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."));
return which::which_in(command, Some(path), &cwd).is_ok();
}
which::which(command).is_ok()
}
}
fn shell_quote(s: &str) -> String {
let mut out = String::with_capacity(s.len() + 2);
out.push('\'');
for c in s.chars() {
if c == '\'' {
out.push_str("'\\''");
} else {
out.push(c);
}
}
out.push('\'');
out
}
fn build_remote_exec(
env: &[(String, String)],
cwd: Option<&str>,
command: &str,
args: &[String],
) -> String {
let mut s = String::new();
if let Some(dir) = cwd {
s.push_str("cd ");
s.push_str(&shell_quote(dir));
s.push_str(" && ");
}
s.push_str("exec ");
if !env.is_empty() {
s.push_str("env ");
for (k, v) in env {
s.push_str(k);
s.push('=');
s.push_str(&shell_quote(v));
s.push(' ');
}
}
s.push_str(&shell_quote(command));
for a in args {
s.push(' ');
s.push_str(&shell_quote(a));
}
s
}
fn build_remote_command_exists(env: &[(String, String)], command: &str) -> String {
let mut s = String::new();
for (k, v) in env {
s.push_str("export ");
s.push_str(k);
s.push('=');
s.push_str(&shell_quote(v));
s.push_str("; ");
}
s.push_str("command -v ");
s.push_str(&shell_quote(command));
s.push_str(" >/dev/null 2>&1");
s
}
fn build_ssh_args(
params: &crate::services::remote::ConnectionParams,
remote_cmd: &str,
) -> Vec<String> {
let mut a = vec![
"-o".to_string(),
"StrictHostKeyChecking=accept-new".to_string(),
"-o".to_string(),
"BatchMode=yes".to_string(),
];
if let Some(port) = params.port {
a.push("-p".to_string());
a.push(port.to_string());
}
if let Some(ref identity) = params.identity_file {
a.push("-i".to_string());
a.push(identity.to_string_lossy().into_owned());
}
a.extend(params.extra_args.iter().cloned());
a.push(params.ssh_target());
a.push(remote_cmd.to_string());
a
}
pub fn build_ssh_terminal_args(
params: &crate::services::remote::ConnectionParams,
remote_dir: Option<&str>,
) -> Vec<String> {
build_ssh_remote_args(params, remote_dir, SSH_EXEC_LOGIN_SHELL)
}
pub fn build_ssh_agent_terminal_args(
params: &crate::services::remote::ConnectionParams,
remote_dir: Option<&str>,
argv: &[String],
) -> Vec<String> {
build_ssh_remote_args(params, remote_dir, &agent_login_exec_tail(argv))
}
fn build_ssh_remote_args(
params: &crate::services::remote::ConnectionParams,
remote_dir: Option<&str>,
exec_tail: &str,
) -> Vec<String> {
let mut a = vec![
"-t".to_string(),
"-o".to_string(),
"StrictHostKeyChecking=accept-new".to_string(),
];
if let Some(port) = params.port {
a.push("-p".to_string());
a.push(port.to_string());
}
if let Some(ref identity) = params.identity_file {
a.push("-i".to_string());
a.push(identity.to_string_lossy().into_owned());
}
a.extend(params.extra_args.iter().cloned());
a.push(params.ssh_target());
let mut remote_cmd = String::new();
if let Some(dir) = remote_dir.filter(|d| !d.is_empty()) {
let quoted = shell_quote(dir);
remote_cmd.push_str(&format!(
"d={quoted}; [ -d \"$d\" ] || d=$(dirname \"$d\"); cd \"$d\" 2>/dev/null; "
));
}
remote_cmd.push_str(exec_tail);
a.push(remote_cmd);
a
}
fn agent_login_exec_tail(argv: &[String]) -> String {
let joined = argv
.iter()
.map(|a| shell_quote(a))
.collect::<Vec<_>>()
.join(" ");
format!(
"exec ${{SHELL:-/bin/sh}} -lc {}",
shell_quote(&format!("exec {joined}"))
)
}
pub const SSH_EXEC_LOGIN_SHELL: &str = "exec ${SHELL:-/bin/sh} -l";
pub fn ssh_remote_env_launcher(recipe: &str) -> String {
use base64::{engine::general_purpose::STANDARD as BASE64, Engine};
let recipe_json = serde_json::to_string(recipe).unwrap_or_else(|_| "\"\"".to_string());
let launcher_src = format!(
r#"import os,subprocess
_r={recipe_json}
_S="{sentinel}"
_script="command env; printf '%s\\n' '"+_S+"'; "+_r+"; command env"
try:
_o=subprocess.run(["bash","-lc",_script],stdout=subprocess.PIPE,stderr=subprocess.DEVNULL).stdout.decode("utf-8","replace")
except Exception:
_o=""
def _p(t):
d={{}}
for ln in t.splitlines():
i=ln.find("=")
if i>0: d[ln[:i]]=ln[i+1:]
return d
if _S in _o:
_b,_a=_o.split(_S,1)
_bb=_p(_b); _aa=_p(_a)
for k,v in _aa.items():
if _bb.get(k)!=v: os.environ[k]=v
for k in list(_bb):
if k not in _aa: os.environ.pop(k,None)
_sh=os.environ.get("SHELL") or "/bin/sh"
os.execvp(_sh,[_sh,"-l"])
"#,
sentinel = crate::services::env_provider::DELTA_SENTINEL,
);
let b64 = BASE64.encode(launcher_src.as_bytes());
format!("exec python3 -c 'import base64;exec(base64.b64decode(\"{b64}\").decode())'")
}
pub fn build_kube_terminal_args(
target: &crate::services::remote::KubeTarget,
base_env: &[(String, String)],
) -> Vec<String> {
build_kube_remote_args(target, base_env, "exec ${SHELL:-/bin/sh} -l")
}
pub fn build_kube_agent_terminal_args(
target: &crate::services::remote::KubeTarget,
base_env: &[(String, String)],
argv: &[String],
) -> Vec<String> {
build_kube_remote_args(target, base_env, &agent_login_exec_tail(argv))
}
fn build_kube_remote_args(
target: &crate::services::remote::KubeTarget,
base_env: &[(String, String)],
exec_tail: &str,
) -> Vec<String> {
let mut remote_cmd = String::new();
for (k, v) in base_env {
remote_cmd.push_str(&format!("export {}={}; ", k, shell_quote(v)));
}
if let Some(dir) = target.workspace.as_deref().filter(|d| !d.is_empty()) {
let quoted = shell_quote(dir);
remote_cmd.push_str(&format!(
"d={quoted}; [ -d \"$d\" ] || d=$(dirname \"$d\"); cd \"$d\" 2>/dev/null; "
));
}
remote_cmd.push_str(exec_tail);
crate::services::remote::transport::kubectl_exec_argv(
target,
&["-it"],
"sh",
&["-lc".to_string(), remote_cmd],
)
}
pub struct RemoteLongRunningSpawner {
params: crate::services::remote::ConnectionParams,
env: Arc<EnvProvider>,
trust: Arc<WorkspaceTrust>,
}
impl RemoteLongRunningSpawner {
pub fn new(
params: crate::services::remote::ConnectionParams,
env: Arc<EnvProvider>,
trust: Arc<WorkspaceTrust>,
) -> Self {
Self { params, env, trust }
}
async fn captured_env(&self) -> Vec<(String, String)> {
let params = self.params.clone();
self.env
.current(move |script| async move {
let ssh_args = build_ssh_args(¶ms, &script);
let output = tokio::process::Command::new("ssh")
.args(&ssh_args)
.hide_window()
.output()
.await
.ok()?;
Some(String::from_utf8_lossy(&output.stdout).into_owned())
})
.await
}
}
#[async_trait::async_trait]
impl LongRunningSpawner for RemoteLongRunningSpawner {
async fn spawn_stdio(
&self,
command: &str,
args: &[String],
env: Vec<(String, String)>,
cwd: Option<&Path>,
_limits: Option<&ProcessLimits>,
) -> Result<StdioChild, SpawnError> {
let cwd_str = cwd.map(|p| p.to_string_lossy().into_owned());
gate(&self.trust, command, cwd_str.as_deref())?;
let mut merged = self.captured_env().await;
merged.extend(env);
let remote = build_remote_exec(&merged, cwd_str.as_deref(), command, args);
let ssh_args = build_ssh_args(&self.params, &remote);
let mut cmd = tokio::process::Command::new("ssh");
cmd.args(&ssh_args)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.hide_window()
.kill_on_drop(true);
let child = cmd
.spawn()
.map_err(|e| SpawnError::Process(e.to_string()))?;
Ok(StdioChild::from_tokio_child(child, false))
}
async fn command_exists(&self, command: &str) -> bool {
let captured = self.captured_env().await;
let remote = build_remote_command_exists(&captured, command);
let ssh_args = build_ssh_args(&self.params, &remote);
match tokio::process::Command::new("ssh")
.args(&ssh_args)
.hide_window()
.output()
.await
{
Ok(output) => output.status.success(),
Err(_) => false,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::io::AsyncReadExt;
#[cfg(not(windows))]
#[test]
fn resolve_program_is_passthrough_on_unix() {
assert_eq!(
resolve_program("typescript-language-server"),
"typescript-language-server"
);
assert_eq!(resolve_program("sh"), "sh");
assert_eq!(resolve_program(""), "");
}
#[cfg(windows)]
#[test]
fn resolve_program_falls_back_and_resolves_on_windows() {
assert_eq!(
resolve_program("fresh-unlikely-binary-name-ygzu9"),
"fresh-unlikely-binary-name-ygzu9"
);
let resolved = resolve_program("cmd");
assert!(
std::path::Path::new(resolved.as_ref()).is_absolute(),
"expected an absolute path, got {resolved:?}"
);
}
#[tokio::test]
async fn test_local_spawner() {
let spawner = LocalProcessSpawner::new(
Arc::new(EnvProvider::inactive()),
Arc::new(WorkspaceTrust::permissive()),
);
let result = spawner
.spawn("echo".to_string(), vec!["hello".to_string()], None)
.await
.unwrap();
assert_eq!(result.exit_code, 0);
assert!(result.stdout.trim() == "hello");
}
#[tokio::test]
async fn test_local_spawner_stdout_to_file() {
let spawner = LocalProcessSpawner::new(
Arc::new(EnvProvider::inactive()),
Arc::new(WorkspaceTrust::permissive()),
);
let tmp =
std::env::temp_dir().join(format!("fresh-spawner-test-{}.out", std::process::id()));
#[allow(clippy::let_underscore_must_use)]
let _ = std::fs::remove_file(&tmp);
let result = spawner
.spawn_to_file(
"echo".to_string(),
vec!["hello-from-disk".to_string()],
None,
tmp.clone(),
)
.await
.unwrap();
assert_eq!(result.exit_code, 0);
assert!(
result.stdout.is_empty(),
"stdout should be empty when streaming"
);
let contents = std::fs::read_to_string(&tmp).expect("output file should exist");
assert_eq!(contents.trim(), "hello-from-disk");
#[allow(clippy::let_underscore_must_use)]
let _ = std::fs::remove_file(&tmp);
}
#[tokio::test]
async fn test_local_spawner_cancellable_kill() {
let spawner = LocalProcessSpawner::new(
Arc::new(EnvProvider::inactive()),
Arc::new(WorkspaceTrust::permissive()),
);
let (kill_tx, kill_rx) = tokio::sync::oneshot::channel::<()>();
let task = tokio::spawn(async move {
spawner
.spawn_cancellable(
"sleep".to_string(),
vec!["30".to_string()],
None,
None,
kill_rx,
)
.await
});
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
#[allow(clippy::let_underscore_must_use)]
let _ = kill_tx.send(());
let start = std::time::Instant::now();
let result = task.await.unwrap().unwrap();
let elapsed = start.elapsed();
assert!(
elapsed < std::time::Duration::from_secs(5),
"kill should be prompt, took {:?}",
elapsed
);
assert_ne!(result.exit_code, 0, "killed process shouldn't be exit 0");
}
#[tokio::test]
async fn local_long_running_spawn_stdio_pipes_output() {
let spawner = LocalLongRunningSpawner::new(
Arc::new(EnvProvider::inactive()),
Arc::new(WorkspaceTrust::permissive()),
);
let mut child = spawner
.spawn_stdio(
"sh",
&["-c".into(), "echo hi".into()],
Vec::new(),
None,
None,
)
.await
.expect("spawn succeeds");
let mut stdout = child.take_stdout().expect("stdout piped");
let mut buf = String::new();
stdout.read_to_string(&mut buf).await.unwrap();
assert_eq!(buf.trim(), "hi");
let status = child.wait().await.unwrap();
assert!(status.success());
assert!(child.spawned_locally());
}
#[tokio::test]
async fn local_long_running_command_exists_for_sh() {
let spawner = LocalLongRunningSpawner::new(
Arc::new(EnvProvider::inactive()),
Arc::new(WorkspaceTrust::permissive()),
);
assert!(spawner.command_exists("sh").await);
assert!(
!spawner
.command_exists("fresh-unlikely-binary-name-ygzu9")
.await
);
}
#[cfg(unix)]
#[tokio::test]
async fn local_spawner_applies_active_env_provider() {
let env = Arc::new(EnvProvider::inactive());
env.set("export FRESH_ENV_TEST=hi-from-provider".into(), None);
let spawner = LocalProcessSpawner::new(env, Arc::new(WorkspaceTrust::permissive()));
let result = spawner
.spawn(
"sh".into(),
vec!["-c".into(), "printf %s \"$FRESH_ENV_TEST\"".into()],
None,
)
.await
.unwrap();
assert_eq!(result.exit_code, 0);
assert_eq!(result.stdout, "hi-from-provider");
}
#[tokio::test]
async fn local_spawner_inactive_provider_injects_nothing() {
let spawner = LocalProcessSpawner::new(
Arc::new(EnvProvider::inactive()),
Arc::new(WorkspaceTrust::permissive()),
);
let result = spawner
.spawn(
"sh".into(),
vec!["-c".into(), "printf %s \"${FRESH_ENV_TEST:-unset}\"".into()],
None,
)
.await
.unwrap();
assert_eq!(result.stdout, "unset");
}
#[test]
fn shell_quote_wraps_and_escapes() {
assert_eq!(shell_quote("abc"), "'abc'");
assert_eq!(shell_quote("a b/c"), "'a b/c'");
assert_eq!(shell_quote("a'b"), "'a'\\''b'");
}
#[test]
fn build_remote_exec_with_cwd_and_env() {
let env = vec![("VIRTUAL_ENV".to_string(), "/proj/.venv".to_string())];
let s = build_remote_exec(&env, Some("/proj dir"), "python", &["x.py".to_string()]);
assert_eq!(
s,
"cd '/proj dir' && exec env VIRTUAL_ENV='/proj/.venv' 'python' 'x.py'"
);
}
#[test]
fn build_remote_exec_minimal() {
assert_eq!(build_remote_exec(&[], None, "gopls", &[]), "exec 'gopls'");
}
#[test]
fn build_remote_command_exists_exports_env() {
let env = vec![("PATH".to_string(), "/proj/.venv/bin:/usr/bin".to_string())];
assert_eq!(
build_remote_command_exists(&env, "pyright"),
"export PATH='/proj/.venv/bin:/usr/bin'; command -v 'pyright' >/dev/null 2>&1"
);
}
#[test]
fn build_ssh_args_full() {
let params = crate::services::remote::ConnectionParams {
user: Some("u".into()),
host: "h".into(),
port: Some(2222),
identity_file: Some(std::path::PathBuf::from("/k")),
extra_args: Vec::new(),
};
let a = build_ssh_args(¶ms, "echo hi");
let expected: Vec<String> = [
"-o",
"StrictHostKeyChecking=accept-new",
"-o",
"BatchMode=yes",
"-p",
"2222",
"-i",
"/k",
"u@h",
"echo hi",
]
.into_iter()
.map(String::from)
.collect();
assert_eq!(a, expected);
}
#[test]
fn build_ssh_args_omits_user_and_threads_extra_args() {
let params = crate::services::remote::ConnectionParams {
user: None,
host: "h".into(),
port: None,
identity_file: None,
extra_args: vec!["-J".into(), "jump".into()],
};
let a = build_ssh_args(¶ms, "echo hi");
let expected: Vec<String> = [
"-o",
"StrictHostKeyChecking=accept-new",
"-o",
"BatchMode=yes",
"-J",
"jump",
"h",
"echo hi",
]
.into_iter()
.map(String::from)
.collect();
assert_eq!(a, expected);
}
#[test]
fn build_ssh_terminal_args_forces_tty_and_login_shell() {
let params = crate::services::remote::ConnectionParams {
user: Some("u".into()),
host: "h".into(),
port: Some(2222),
identity_file: Some(std::path::PathBuf::from("/k")),
extra_args: Vec::new(),
};
let a = build_ssh_terminal_args(¶ms, Some("/proj dir"));
let expected: Vec<String> = [
"-t",
"-o",
"StrictHostKeyChecking=accept-new",
"-p",
"2222",
"-i",
"/k",
"u@h",
"d='/proj dir'; [ -d \"$d\" ] || d=$(dirname \"$d\"); cd \"$d\" 2>/dev/null; exec ${SHELL:-/bin/sh} -l",
]
.into_iter()
.map(String::from)
.collect();
assert_eq!(a, expected);
assert!(!a.iter().any(|s| s == "BatchMode=yes"));
assert!(a.last().unwrap().ends_with(SSH_EXEC_LOGIN_SHELL));
}
#[test]
fn ssh_remote_env_launcher_is_a_safe_single_quoted_python_oneliner() {
use base64::{engine::general_purpose::STANDARD as BASE64, Engine};
let recipe = "eval \"$(direnv export bash)\"";
let launcher = ssh_remote_env_launcher(recipe);
assert!(launcher.starts_with("exec python3 -c '"));
assert!(launcher.ends_with('\''));
let inner = launcher
.trim_start_matches("exec python3 -c '")
.trim_end_matches('\'');
assert!(
!inner.contains('\''),
"inner literal must not contain a single quote"
);
let b64 = inner
.trim_start_matches("import base64;exec(base64.b64decode(\"")
.trim_end_matches("\").decode())");
let src = String::from_utf8(BASE64.decode(b64).unwrap()).unwrap();
assert!(
src.contains("direnv export bash"),
"recipe must be embedded"
);
assert!(src.contains(crate::services::env_provider::DELTA_SENTINEL));
assert!(src.contains("os.execvp"));
}
#[test]
fn ssh_launcher_embeds_recipes_with_quotes_safely() {
let recipe = "export X='a b'; source ./.venv/bin/activate";
let launcher = ssh_remote_env_launcher(recipe);
let inner = launcher
.trim_start_matches("exec python3 -c '")
.trim_end_matches('\'');
assert!(
!inner.contains('\''),
"recipe quotes must be base64-encapsulated, never leak into the literal"
);
}
#[test]
fn build_ssh_terminal_args_without_dir_skips_cd() {
let params = crate::services::remote::ConnectionParams {
user: Some("u".into()),
host: "h".into(),
port: None,
identity_file: None,
extra_args: Vec::new(),
};
let a = build_ssh_terminal_args(¶ms, None);
assert_eq!(
a,
vec![
"-t",
"-o",
"StrictHostKeyChecking=accept-new",
"u@h",
"exec ${SHELL:-/bin/sh} -l",
]
);
assert_eq!(build_ssh_terminal_args(¶ms, Some("")), a);
}
#[test]
fn build_ssh_agent_terminal_args_runs_agent_in_remote_workspace() {
let params = crate::services::remote::ConnectionParams {
user: Some("u".into()),
host: "h".into(),
port: Some(2222),
identity_file: Some(std::path::PathBuf::from("/k")),
extra_args: Vec::new(),
};
let argv = vec![
"claude".to_string(),
"--resume".to_string(),
"u-1".to_string(),
];
let a = build_ssh_agent_terminal_args(¶ms, Some("/srv/proj"), &argv);
assert_eq!(
&a[..8],
&[
"-t",
"-o",
"StrictHostKeyChecking=accept-new",
"-p",
"2222",
"-i",
"/k",
"u@h",
]
);
let remote_cmd = a.last().unwrap();
assert!(remote_cmd.contains("cd \"$d\"") && remote_cmd.contains("'/srv/proj'"));
assert!(remote_cmd.contains("exec ${SHELL:-/bin/sh} -lc "));
assert!(
remote_cmd.contains("claude")
&& remote_cmd.contains("--resume")
&& remote_cmd.contains("u-1")
);
}
#[test]
fn build_ssh_agent_terminal_args_quotes_args_with_spaces() {
let params = crate::services::remote::ConnectionParams {
user: None,
host: "h".into(),
port: None,
identity_file: None,
extra_args: Vec::new(),
};
let argv = vec!["agent".to_string(), "a b".to_string()];
let remote_cmd = build_ssh_agent_terminal_args(¶ms, None, &argv)
.pop()
.unwrap();
assert!(remote_cmd.contains("'a b'"));
}
#[test]
fn build_kube_agent_terminal_args_runs_agent_in_pod_workspace() {
let target = crate::services::remote::KubeTarget {
context: None,
namespace: "dev".into(),
pod: "pod-1".into(),
container: None,
workspace: Some("/workspace".into()),
};
let argv = vec![
"claude".to_string(),
"--resume".to_string(),
"u-1".to_string(),
];
let a = build_kube_agent_terminal_args(&target, &[], &argv);
assert_eq!(a[0], "exec");
assert!(a.contains(&"-it".to_string()));
assert!(a.contains(&"sh".to_string()) && a.contains(&"-lc".to_string()));
let remote_cmd = a.last().unwrap();
assert!(remote_cmd.contains("cd \"$d\"") && remote_cmd.contains("'/workspace'"));
assert!(remote_cmd.contains("exec ${SHELL:-/bin/sh} -lc "));
assert!(remote_cmd.contains("claude"));
}
#[test]
fn build_kube_terminal_args_allocates_tty_and_pins_cwd() {
let target = crate::services::remote::KubeTarget {
context: Some("prod".into()),
namespace: "dev".into(),
pod: "pod-1".into(),
container: Some("app".into()),
workspace: Some("/workspace".into()),
};
let a = build_kube_terminal_args(&target, &[]);
let expected: Vec<String> = [
"--context",
"prod",
"exec",
"-it",
"-n",
"dev",
"-c",
"app",
"pod-1",
"--",
"sh",
"-lc",
"d='/workspace'; [ -d \"$d\" ] || d=$(dirname \"$d\"); cd \"$d\" 2>/dev/null; exec ${SHELL:-/bin/sh} -l",
]
.into_iter()
.map(String::from)
.collect();
assert_eq!(a, expected);
}
#[test]
fn build_kube_terminal_args_exports_base_env_before_login_shell() {
let target = crate::services::remote::KubeTarget {
context: None,
namespace: "dev".into(),
pod: "pod-1".into(),
container: None,
workspace: None,
};
let base_env = vec![
("VIRTUAL_ENV".to_string(), "/c/.venv".to_string()),
("MSG".to_string(), "a b".to_string()),
];
let a = build_kube_terminal_args(&target, &base_env);
assert_eq!(
a.last().unwrap(),
"export VIRTUAL_ENV='/c/.venv'; export MSG='a b'; exec ${SHELL:-/bin/sh} -l"
);
}
#[test]
fn build_kube_terminal_args_without_workspace_skips_cd() {
let target = crate::services::remote::KubeTarget {
context: None,
namespace: "dev".into(),
pod: "pod-1".into(),
container: None,
workspace: None,
};
let a = build_kube_terminal_args(&target, &[]);
assert_eq!(
a,
vec![
"exec",
"-it",
"-n",
"dev",
"pod-1",
"--",
"sh",
"-lc",
"exec ${SHELL:-/bin/sh} -l",
]
);
}
}