use serde_json::Value;
use std::path::Path;
use std::time::Duration;
use tokio::io::AsyncBufReadExt;
use tokio::sync::mpsc;
const DEFAULT_TIMEOUT_SECS: u64 = 60;
const MAX_OUTPUT_BYTES: usize = 65_536;
pub async fn execute(args: &Value, budget_tokens: usize) -> Result<String, String> {
let budget_chars = budget_tokens.saturating_mul(4);
let effective_limit = if budget_tokens == 0 {
MAX_OUTPUT_BYTES
} else {
budget_chars.min(MAX_OUTPUT_BYTES).max(1000)
};
let mut command = args
.get("command")
.and_then(|v| v.as_str())
.ok_or_else(|| "Missing required argument: 'command'".to_string())?
.to_string();
if command.contains('@') {
let root = crate::tools::file_ops::workspace_root();
let root_str = root.to_string_lossy().to_string().replace("\\", "/");
command = command.replace('@', &format!("{}/", root_str.trim_end_matches('/')));
}
let timeout_ms = args
.get("timeout_ms")
.and_then(|v| v.as_u64())
.or_else(|| {
args.get("timeout_secs")
.and_then(|v| v.as_u64())
.map(|s| s * 1000)
})
.unwrap_or(DEFAULT_TIMEOUT_SECS * 1000);
let run_in_background = args
.get("run_in_background")
.and_then(|v| v.as_bool())
.unwrap_or(false);
let cwd =
std::env::current_dir().map_err(|e| format!("Failed to get working directory: {e}"))?;
execute_command_in_dir(
&command,
&cwd,
timeout_ms,
run_in_background,
effective_limit,
)
.await
}
pub async fn execute_streaming(
args: &Value,
tx: mpsc::Sender<crate::agent::inference::InferenceEvent>,
budget_tokens: usize,
) -> Result<String, String> {
let budget_chars = budget_tokens.saturating_mul(4);
let effective_limit = if budget_tokens == 0 {
MAX_OUTPUT_BYTES
} else {
budget_chars.min(MAX_OUTPUT_BYTES).max(1000)
};
if args
.get("run_in_background")
.and_then(|v| v.as_bool())
.unwrap_or(false)
{
return execute(args, budget_tokens).await;
}
let mut command = args
.get("command")
.and_then(|v| v.as_str())
.ok_or_else(|| "Missing required argument: 'command'".to_string())?
.to_string();
if command.contains('@') {
let root = crate::tools::file_ops::workspace_root();
let root_str = root.to_string_lossy().replace("\\", "/").to_string();
command = command.replace('@', &format!("{}/", root_str.trim_end_matches('/')));
}
let timeout_ms = args
.get("timeout_ms")
.and_then(|v| v.as_u64())
.or_else(|| {
args.get("timeout_secs")
.and_then(|v| v.as_u64())
.map(|s| s * 1000)
})
.unwrap_or(DEFAULT_TIMEOUT_SECS * 1000);
crate::tools::guard::bash_is_safe(&command)?;
let cwd =
std::env::current_dir().map_err(|e| format!("Failed to get working directory: {e}"))?;
let mut tokio_cmd = build_command(&command).await;
tokio_cmd
.current_dir(&cwd)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped());
let sandbox_root = crate::tools::file_ops::hematite_dir().join("sandbox");
let _ = std::fs::create_dir_all(&sandbox_root);
tokio_cmd.env("HOME", &sandbox_root);
tokio_cmd.env("TMPDIR", &sandbox_root);
let mut child = tokio_cmd
.spawn()
.map_err(|e| format!("Failed to spawn process: {e}"))?;
let stdout = child.stdout.take().expect("stdout was piped");
let stderr = child.stderr.take().expect("stderr was piped");
let mut stdout_lines = tokio::io::BufReader::new(stdout).lines();
let mut stderr_lines = tokio::io::BufReader::new(stderr).lines();
let mut out_buf = String::new();
let mut err_buf = String::new();
let mut stdout_done = false;
let mut stderr_done = false;
let deadline = tokio::time::Instant::now() + Duration::from_millis(timeout_ms);
loop {
if stdout_done && stderr_done {
break;
}
tokio::select! {
_ = tokio::time::sleep_until(deadline) => {
let _ = child.kill().await;
return Err(format!("Command timed out after {} ms: {}", timeout_ms, command));
}
line = stdout_lines.next_line(), if !stdout_done => {
match line {
Ok(Some(l)) => {
let clean = l.trim_end_matches('\r').to_string();
let _ = tx
.send(crate::agent::inference::InferenceEvent::ShellLine(clean.clone()))
.await;
out_buf.push_str(&clean);
out_buf.push('\n');
}
_ => stdout_done = true,
}
}
line = stderr_lines.next_line(), if !stderr_done => {
match line {
Ok(Some(l)) => {
let clean = l.trim_end_matches('\r').to_string();
let _ = tx
.send(crate::agent::inference::InferenceEvent::ShellLine(
format!("[err] {}", clean),
))
.await;
err_buf.push_str(&clean);
err_buf.push('\n');
}
_ => stderr_done = true,
}
}
}
}
let status = tokio::time::timeout(Duration::from_millis(5_000), child.wait())
.await
.map_err(|_| "Process cleanup timed out".to_string())?
.map_err(|e| format!("Failed to wait for process: {e}"))?;
let stdout_raw = out_buf;
let stderr_raw = err_buf;
let exit_info = match status.code() {
Some(0) => String::new(),
Some(code) => format!("\n[exit code: {code}]"),
None => "\n[process terminated by signal]".to_string(),
};
let mut result = String::new();
if !stdout_raw.is_empty() {
result.push_str(&stdout_raw);
}
if !stderr_raw.is_empty() {
if !result.is_empty() {
result.push('\n');
}
result.push_str("[stderr]\n");
result.push_str(&stderr_raw);
}
if result.is_empty() {
result.push_str("(no output)");
}
result.push_str(&exit_info);
let clean = crate::agent::utils::strip_ansi(&result);
Ok(crate::agent::truncation::formatted_truncate(
&clean,
effective_limit,
))
}
pub async fn execute_command_in_dir(
command: &str,
cwd: &Path,
timeout_ms: u64,
run_in_background: bool,
limit_bytes: usize,
) -> Result<String, String> {
crate::tools::guard::bash_is_safe(command)?;
let mut tokio_cmd = build_command(command).await;
tokio_cmd
.current_dir(cwd)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped());
let sandbox_root = crate::tools::file_ops::hematite_dir().join("sandbox");
let _ = std::fs::create_dir_all(&sandbox_root);
tokio_cmd.env("HOME", &sandbox_root);
tokio_cmd.env("TMPDIR", &sandbox_root);
if run_in_background {
let _child = tokio_cmd
.spawn()
.map_err(|e| format!("Failed to spawn background process: {e}"))?;
return Ok(
"[background_task_id: spawned]\nCommand started in background. Use `ps` or `jobs` to monitor if available."
.into(),
);
}
let child_future = tokio_cmd.output();
let output = match tokio::time::timeout(Duration::from_millis(timeout_ms), child_future).await {
Ok(Ok(output)) => output,
Ok(Err(e)) => return Err(format!("Failed to execution process: {e}")),
Err(_) => {
return Err(format!(
"Command timed out after {} ms: {}",
timeout_ms, command
))
}
};
let stdout = String::from_utf8_lossy(&output.stdout);
let stderr = String::from_utf8_lossy(&output.stderr);
let exit_info = match output.status.code() {
Some(0) => String::new(),
Some(code) => format!("\n[exit code: {code}]"),
None => "\n[process terminated by signal]".to_string(),
};
let mut result = String::new();
if !stdout.is_empty() {
result.push_str(&stdout);
}
if !stderr.is_empty() {
if !result.is_empty() {
result.push('\n');
}
result.push_str("[stderr]\n");
result.push_str(&stderr);
}
if result.is_empty() {
result.push_str("(no output)");
}
result.push_str(&exit_info);
let clean = crate::agent::utils::strip_ansi(&result);
Ok(crate::agent::truncation::formatted_truncate(
&clean,
limit_bytes,
))
}
async fn build_command(command: &str) -> tokio::process::Command {
#[cfg(target_os = "windows")]
{
let normalized = command
.replace("/dev/null", "$null")
.replace("1>/dev/null", "2>$null")
.replace("2>/dev/null", "2>$null");
if which("pwsh").await {
let mut cmd = tokio::process::Command::new("pwsh");
cmd.args(["-NoProfile", "-NonInteractive", "-Command", &normalized]);
cmd
} else {
let mut cmd = tokio::process::Command::new("powershell");
cmd.args(["-NoProfile", "-NonInteractive", "-Command", &normalized]);
cmd
}
}
#[cfg(not(target_os = "windows"))]
{
let mut cmd = tokio::process::Command::new("sh");
cmd.args(["-c", command]);
cmd
}
}
#[allow(dead_code)]
async fn which(name: &str) -> bool {
#[cfg(target_os = "windows")]
let check = format!("{}.exe", name);
#[cfg(not(target_os = "windows"))]
let check = name;
tokio::process::Command::new("where")
.arg(check)
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.status()
.await
.map(|s| s.success())
.unwrap_or(false)
}