use crate::services::process_hidden::HideWindow;
use crate::services::process_limits::PostSpawnAction;
use crate::services::remote::channel::{AgentChannel, ChannelError};
use crate::services::remote::protocol::{decode_base64, exec_params};
use crate::types::ProcessLimits;
use std::path::Path;
use std::process::ExitStatus;
use std::sync::Arc;
use tokio::process::{ChildStderr, ChildStdin, ChildStdout};
#[derive(Debug, Clone)]
pub struct SpawnResult {
pub stdout: String,
pub stderr: String,
pub exit_code: i32,
}
#[derive(Debug, thiserror::Error)]
pub enum SpawnError {
#[error("Channel error: {0}")]
Channel(#[from] ChannelError),
#[error("Process error: {0}")]
Process(String),
#[error("Decode error: {0}")]
Decode(String),
}
#[async_trait::async_trait]
pub trait ProcessSpawner: Send + Sync {
async fn spawn(
&self,
command: String,
args: Vec<String>,
cwd: Option<String>,
) -> Result<SpawnResult, SpawnError>;
async fn spawn_to_file(
&self,
command: String,
args: Vec<String>,
cwd: Option<String>,
stdout_to: std::path::PathBuf,
) -> Result<SpawnResult, SpawnError> {
let result = self.spawn(command, args, cwd).await?;
if result.exit_code == 0 || !result.stdout.is_empty() {
std::fs::write(&stdout_to, result.stdout.as_bytes())
.map_err(|e| SpawnError::Process(format!("write {:?}: {}", stdout_to, e)))?;
}
Ok(SpawnResult {
stdout: String::new(),
stderr: result.stderr,
exit_code: result.exit_code,
})
}
async fn spawn_cancellable(
&self,
command: String,
args: Vec<String>,
cwd: Option<String>,
stdout_to: Option<std::path::PathBuf>,
_kill_rx: tokio::sync::oneshot::Receiver<()>,
) -> Result<SpawnResult, SpawnError> {
match stdout_to {
Some(p) => self.spawn_to_file(command, args, cwd, p).await,
None => self.spawn(command, args, cwd).await,
}
}
}
pub struct LocalProcessSpawner;
#[async_trait::async_trait]
impl ProcessSpawner for LocalProcessSpawner {
async fn spawn(
&self,
command: String,
args: Vec<String>,
cwd: Option<String>,
) -> Result<SpawnResult, SpawnError> {
let mut cmd = tokio::process::Command::new(&command);
cmd.args(&args);
cmd.hide_window();
if let Some(ref dir) = cwd {
cmd.current_dir(dir);
}
let output = cmd
.output()
.await
.map_err(|e| SpawnError::Process(e.to_string()))?;
Ok(SpawnResult {
stdout: String::from_utf8_lossy(&output.stdout).to_string(),
stderr: String::from_utf8_lossy(&output.stderr).to_string(),
exit_code: output.status.code().unwrap_or(-1),
})
}
async fn spawn_cancellable(
&self,
command: String,
args: Vec<String>,
cwd: Option<String>,
stdout_to: Option<std::path::PathBuf>,
kill_rx: tokio::sync::oneshot::Receiver<()>,
) -> Result<SpawnResult, SpawnError> {
use std::process::Stdio;
use tokio::io::AsyncReadExt;
let mut cmd = tokio::process::Command::new(&command);
cmd.args(&args);
cmd.hide_window();
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
if let Some(ref dir) = cwd {
cmd.current_dir(dir);
}
if let Some(ref path) = stdout_to {
if let Some(parent) = path.parent() {
if !parent.as_os_str().is_empty() {
tokio::fs::create_dir_all(parent).await.map_err(|e| {
SpawnError::Process(format!("create_dir_all {:?}: {}", parent, e))
})?;
}
}
}
let mut child = cmd
.spawn()
.map_err(|e| SpawnError::Process(e.to_string()))?;
let mut child_stdout = child
.stdout
.take()
.ok_or_else(|| SpawnError::Process("child stdout missing".to_string()))?;
let mut child_stderr = child
.stderr
.take()
.ok_or_else(|| SpawnError::Process("child stderr missing".to_string()))?;
let stdout_task: tokio::task::JoinHandle<std::io::Result<Vec<u8>>> = match stdout_to {
Some(path) => tokio::spawn(async move {
let mut file = tokio::fs::File::create(&path).await?;
tokio::io::copy(&mut child_stdout, &mut file).await?;
use tokio::io::AsyncWriteExt;
if let Err(e) = file.flush().await {
tracing::warn!("spawn_cancellable: file flush failed: {}", e);
}
if let Err(e) = file.sync_all().await {
tracing::warn!("spawn_cancellable: file sync_all failed: {}", e);
}
Ok(Vec::new())
}),
None => tokio::spawn(async move {
let mut buf = Vec::new();
child_stdout.read_to_end(&mut buf).await?;
Ok(buf)
}),
};
let stderr_task: tokio::task::JoinHandle<std::io::Result<Vec<u8>>> =
tokio::spawn(async move {
let mut buf = Vec::new();
child_stderr.read_to_end(&mut buf).await?;
Ok(buf)
});
let exit_code = tokio::select! {
status = child.wait() => status
.map(|s| s.code().unwrap_or(-1))
.unwrap_or(-1),
_ = kill_rx => {
if let Err(e) = child.start_kill() {
tracing::debug!("spawn_cancellable: start_kill (already exited?): {}", e);
}
child.wait().await.map(|s| s.code().unwrap_or(-1)).unwrap_or(-1)
}
};
let stdout_bytes = stdout_task
.await
.map_err(|e| SpawnError::Process(format!("stdout task: {}", e)))?
.map_err(|e| SpawnError::Process(format!("stdout drain: {}", e)))?;
let stderr_bytes = stderr_task
.await
.map_err(|e| SpawnError::Process(format!("stderr task: {}", e)))?
.map_err(|e| SpawnError::Process(format!("stderr drain: {}", e)))?;
Ok(SpawnResult {
stdout: String::from_utf8_lossy(&stdout_bytes).to_string(),
stderr: String::from_utf8_lossy(&stderr_bytes).to_string(),
exit_code,
})
}
async fn spawn_to_file(
&self,
command: String,
args: Vec<String>,
cwd: Option<String>,
stdout_to: std::path::PathBuf,
) -> Result<SpawnResult, SpawnError> {
use std::process::Stdio;
use tokio::io::AsyncWriteExt;
let mut cmd = tokio::process::Command::new(&command);
cmd.args(&args);
cmd.hide_window();
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
if let Some(ref dir) = cwd {
cmd.current_dir(dir);
}
if let Some(parent) = stdout_to.parent() {
if !parent.as_os_str().is_empty() {
tokio::fs::create_dir_all(parent).await.map_err(|e| {
SpawnError::Process(format!("create_dir_all {:?}: {}", parent, e))
})?;
}
}
let mut file = tokio::fs::File::create(&stdout_to)
.await
.map_err(|e| SpawnError::Process(format!("create {:?}: {}", stdout_to, e)))?;
let mut child = cmd
.spawn()
.map_err(|e| SpawnError::Process(e.to_string()))?;
let mut child_stdout = child
.stdout
.take()
.ok_or_else(|| SpawnError::Process("child stdout missing".to_string()))?;
let mut child_stderr = child
.stderr
.take()
.ok_or_else(|| SpawnError::Process("child stderr missing".to_string()))?;
let stdout_task = tokio::spawn(async move {
let res = tokio::io::copy(&mut child_stdout, &mut file).await;
if let Err(e) = file.flush().await {
tracing::warn!("spawn_to_file: file flush failed: {}", e);
}
if let Err(e) = file.sync_all().await {
tracing::warn!("spawn_to_file: file sync_all failed: {}", e);
}
res
});
let stderr_task = tokio::spawn(async move {
let mut buf = Vec::new();
let res = tokio::io::copy(&mut child_stderr, &mut buf).await;
res.map(|_| buf)
});
let status = child
.wait()
.await
.map_err(|e| SpawnError::Process(format!("wait: {}", e)))?;
stdout_task
.await
.map_err(|e| SpawnError::Process(format!("stdout task: {}", e)))?
.map_err(|e| SpawnError::Process(format!("stdout copy: {}", e)))?;
let stderr_bytes = stderr_task
.await
.map_err(|e| SpawnError::Process(format!("stderr task: {}", e)))?
.map_err(|e| SpawnError::Process(format!("stderr drain: {}", e)))?;
Ok(SpawnResult {
stdout: String::new(),
stderr: String::from_utf8_lossy(&stderr_bytes).to_string(),
exit_code: status.code().unwrap_or(-1),
})
}
}
pub struct RemoteProcessSpawner {
channel: Arc<AgentChannel>,
}
impl RemoteProcessSpawner {
pub fn new(channel: Arc<AgentChannel>) -> Self {
Self { channel }
}
}
#[async_trait::async_trait]
impl ProcessSpawner for RemoteProcessSpawner {
async fn spawn(
&self,
command: String,
args: Vec<String>,
cwd: Option<String>,
) -> Result<SpawnResult, SpawnError> {
let params = exec_params(&command, &args, cwd.as_deref());
let (mut data_rx, result_rx) = self.channel.request_streaming("exec", params).await?;
let mut stdout = Vec::new();
let mut stderr = Vec::new();
while let Some(data) = data_rx.recv().await {
if let Some(out) = data.get("out").and_then(|v| v.as_str()) {
if let Ok(decoded) = decode_base64(out) {
stdout.extend_from_slice(&decoded);
}
}
if let Some(err) = data.get("err").and_then(|v| v.as_str()) {
if let Ok(decoded) = decode_base64(err) {
stderr.extend_from_slice(&decoded);
}
}
}
let result = result_rx
.await
.map_err(|_| SpawnError::Channel(ChannelError::ChannelClosed))?
.map_err(SpawnError::Process)?;
let exit_code = result
.get("code")
.and_then(|v| v.as_i64())
.map(|c| c as i32)
.unwrap_or(-1);
Ok(SpawnResult {
stdout: String::from_utf8_lossy(&stdout).to_string(),
stderr: String::from_utf8_lossy(&stderr).to_string(),
exit_code,
})
}
async fn spawn_to_file(
&self,
_command: String,
_args: Vec<String>,
_cwd: Option<String>,
_stdout_to: std::path::PathBuf,
) -> Result<SpawnResult, SpawnError> {
Err(SpawnError::Process(
"stdoutTo is not supported for remote processes".to_string(),
))
}
}
pub struct StdioChild {
inner: tokio::process::Child,
stdin: Option<ChildStdin>,
stdout: Option<ChildStdout>,
stderr: Option<ChildStderr>,
spawned_locally: bool,
}
impl StdioChild {
pub fn from_tokio_child(mut child: tokio::process::Child, spawned_locally: bool) -> Self {
let stdin = child.stdin.take();
let stdout = child.stdout.take();
let stderr = child.stderr.take();
Self {
inner: child,
stdin,
stdout,
stderr,
spawned_locally,
}
}
pub fn from_local_tokio_child(
child: tokio::process::Child,
post_spawn: PostSpawnAction,
) -> Self {
let out = Self::from_tokio_child(child, true);
if let Some(pid) = out.inner.id() {
post_spawn.apply_to_child(pid);
}
out
}
pub fn take_stdin(&mut self) -> Option<ChildStdin> {
self.stdin.take()
}
pub fn take_stdout(&mut self) -> Option<ChildStdout> {
self.stdout.take()
}
pub fn take_stderr(&mut self) -> Option<ChildStderr> {
self.stderr.take()
}
pub fn id(&self) -> Option<u32> {
self.inner.id()
}
pub fn spawned_locally(&self) -> bool {
self.spawned_locally
}
pub async fn kill(&mut self) -> std::io::Result<()> {
self.inner.kill().await
}
pub async fn wait(&mut self) -> std::io::Result<ExitStatus> {
self.inner.wait().await
}
}
#[async_trait::async_trait]
pub trait LongRunningSpawner: Send + Sync {
async fn spawn_stdio(
&self,
command: &str,
args: &[String],
env: Vec<(String, String)>,
cwd: Option<&Path>,
limits: Option<&ProcessLimits>,
) -> Result<StdioChild, SpawnError>;
async fn command_exists(&self, command: &str) -> bool;
}
pub struct LocalLongRunningSpawner;
#[async_trait::async_trait]
impl LongRunningSpawner for LocalLongRunningSpawner {
async fn spawn_stdio(
&self,
command: &str,
args: &[String],
env: Vec<(String, String)>,
cwd: Option<&Path>,
limits: Option<&ProcessLimits>,
) -> Result<StdioChild, SpawnError> {
let mut cmd = tokio::process::Command::new(command);
cmd.args(args)
.envs(env)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.hide_window()
.kill_on_drop(true);
if let Some(dir) = cwd {
cmd.current_dir(dir);
}
let post_spawn = match limits {
Some(lim) => lim
.apply_to_command(&mut cmd)
.map_err(|e| SpawnError::Process(format!("Failed to apply process limits: {e}")))?,
None => PostSpawnAction::default(),
};
let child = cmd
.spawn()
.map_err(|e| SpawnError::Process(e.to_string()))?;
Ok(StdioChild::from_local_tokio_child(child, post_spawn))
}
async fn command_exists(&self, command: &str) -> bool {
which::which(command).is_ok()
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::io::AsyncReadExt;
#[tokio::test]
async fn test_local_spawner() {
let spawner = LocalProcessSpawner;
let result = spawner
.spawn("echo".to_string(), vec!["hello".to_string()], None)
.await
.unwrap();
assert_eq!(result.exit_code, 0);
assert!(result.stdout.trim() == "hello");
}
#[tokio::test]
async fn test_local_spawner_stdout_to_file() {
let spawner = LocalProcessSpawner;
let tmp =
std::env::temp_dir().join(format!("fresh-spawner-test-{}.out", std::process::id()));
#[allow(clippy::let_underscore_must_use)]
let _ = std::fs::remove_file(&tmp);
let result = spawner
.spawn_to_file(
"echo".to_string(),
vec!["hello-from-disk".to_string()],
None,
tmp.clone(),
)
.await
.unwrap();
assert_eq!(result.exit_code, 0);
assert!(
result.stdout.is_empty(),
"stdout should be empty when streaming"
);
let contents = std::fs::read_to_string(&tmp).expect("output file should exist");
assert_eq!(contents.trim(), "hello-from-disk");
#[allow(clippy::let_underscore_must_use)]
let _ = std::fs::remove_file(&tmp);
}
#[tokio::test]
async fn test_local_spawner_cancellable_kill() {
let spawner = LocalProcessSpawner;
let (kill_tx, kill_rx) = tokio::sync::oneshot::channel::<()>();
let task = tokio::spawn(async move {
spawner
.spawn_cancellable(
"sleep".to_string(),
vec!["30".to_string()],
None,
None,
kill_rx,
)
.await
});
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
#[allow(clippy::let_underscore_must_use)]
let _ = kill_tx.send(());
let start = std::time::Instant::now();
let result = task.await.unwrap().unwrap();
let elapsed = start.elapsed();
assert!(
elapsed < std::time::Duration::from_secs(5),
"kill should be prompt, took {:?}",
elapsed
);
assert_ne!(result.exit_code, 0, "killed process shouldn't be exit 0");
}
#[tokio::test]
async fn local_long_running_spawn_stdio_pipes_output() {
let spawner = LocalLongRunningSpawner;
let mut child = spawner
.spawn_stdio(
"sh",
&["-c".into(), "echo hi".into()],
Vec::new(),
None,
None,
)
.await
.expect("spawn succeeds");
let mut stdout = child.take_stdout().expect("stdout piped");
let mut buf = String::new();
stdout.read_to_string(&mut buf).await.unwrap();
assert_eq!(buf.trim(), "hi");
let status = child.wait().await.unwrap();
assert!(status.success());
assert!(child.spawned_locally());
}
#[tokio::test]
async fn local_long_running_command_exists_for_sh() {
let spawner = LocalLongRunningSpawner;
assert!(spawner.command_exists("sh").await);
assert!(
!spawner
.command_exists("fresh-unlikely-binary-name-ygzu9")
.await
);
}
}