use crate::error::{ConnectorError, Result};
use std::collections::HashMap;
use std::path::PathBuf;
use std::time::Duration;
use tokio::process::Command;
#[derive(Debug, Clone)]
pub struct CommandOutput {
pub stdout: String,
pub stderr: String,
pub exit_code: Option<i32>,
pub success: bool,
}
#[derive(Debug, Clone, Default)]
pub struct CommandOptions {
pub timeout: Option<Duration>,
pub working_dir: Option<PathBuf>,
pub env: Vec<(String, String)>,
pub clear_env: bool,
pub stdin_data: Option<Vec<u8>>,
}
pub async fn run_command(program: &str, args: &[&str]) -> Result<CommandOutput> {
run_command_opts(program, args, CommandOptions::default()).await
}
pub async fn run_command_with_timeout(
program: &str,
args: &[&str],
timeout: Duration,
) -> Result<CommandOutput> {
run_command_opts(
program,
args,
CommandOptions {
timeout: Some(timeout),
..Default::default()
},
)
.await
}
pub async fn run_command_opts(
program: &str,
args: &[&str],
options: CommandOptions,
) -> Result<CommandOutput> {
let mut cmd = Command::new(program);
cmd.args(args);
if options.clear_env {
cmd.env_clear();
}
for (key, value) in &options.env {
cmd.env(key, value);
}
if let Some(ref dir) = options.working_dir {
cmd.current_dir(dir);
}
if options.stdin_data.is_some() {
cmd.stdin(std::process::Stdio::piped());
}
cmd.stdout(std::process::Stdio::piped());
cmd.stderr(std::process::Stdio::piped());
let mut child = cmd
.spawn()
.map_err(|e| ConnectorError::Other(format!("Failed to spawn '{program}': {e}")))?;
if let Some(data) = options.stdin_data {
use std::io::ErrorKind;
use tokio::io::AsyncWriteExt;
if let Some(mut stdin) = child.stdin.take() {
if let Err(e) = stdin.write_all(&data).await
&& e.kind() != ErrorKind::BrokenPipe
{
return Err(ConnectorError::Other(format!(
"Failed to write stdin to '{program}': {e}"
)));
}
drop(stdin);
}
}
let output_future = child.wait_with_output();
let output = if let Some(timeout) = options.timeout {
match tokio::time::timeout(timeout, output_future).await {
Ok(result) => result.map_err(|e| {
ConnectorError::Other(format!("Command '{program}' I/O error: {e}"))
})?,
Err(_) => {
return Err(ConnectorError::Timeout(format!(
"Command '{program}' timed out after {timeout:?}"
)));
}
}
} else {
output_future
.await
.map_err(|e| ConnectorError::Other(format!("Command '{program}' I/O error: {e}")))?
};
Ok(CommandOutput {
stdout: String::from_utf8_lossy(&output.stdout).into_owned(),
stderr: String::from_utf8_lossy(&output.stderr).into_owned(),
exit_code: output.status.code(),
success: output.status.success(),
})
}
pub async fn run_shell(command: &str) -> Result<CommandOutput> {
run_command("sh", &["-c", command]).await
}
pub async fn run_shell_with_timeout(command: &str, timeout: Duration) -> Result<CommandOutput> {
run_command_with_timeout("sh", &["-c", command], timeout).await
}
pub async fn run_command_stdout(program: &str, args: &[&str]) -> Result<String> {
let output = run_command(program, args).await?;
if !output.success {
return Err(ConnectorError::Other(format!(
"Command '{program}' failed (exit {}): {}",
output.exit_code.unwrap_or(-1),
output.stderr.trim()
)));
}
Ok(output.stdout.trim().to_string())
}
pub struct CommandBuilder {
program: String,
args: Vec<String>,
options: CommandOptions,
}
impl CommandBuilder {
pub fn new(program: impl Into<String>) -> Self {
Self {
program: program.into(),
args: Vec::new(),
options: CommandOptions::default(),
}
}
pub fn arg(mut self, arg: impl Into<String>) -> Self {
self.args.push(arg.into());
self
}
pub fn args(mut self, args: &[&str]) -> Self {
self.args.extend(args.iter().map(|s| s.to_string()));
self
}
pub fn timeout(mut self, timeout: Duration) -> Self {
self.options.timeout = Some(timeout);
self
}
pub fn working_dir(mut self, dir: impl Into<PathBuf>) -> Self {
self.options.working_dir = Some(dir.into());
self
}
pub fn env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.options.env.push((key.into(), value.into()));
self
}
pub fn envs(mut self, vars: HashMap<String, String>) -> Self {
self.options.env.extend(vars);
self
}
pub fn clear_env(mut self) -> Self {
self.options.clear_env = true;
self
}
pub fn stdin(mut self, data: impl Into<Vec<u8>>) -> Self {
self.options.stdin_data = Some(data.into());
self
}
pub async fn run(self) -> Result<CommandOutput> {
let args: Vec<&str> = self.args.iter().map(|s| s.as_str()).collect();
run_command_opts(&self.program, &args, self.options).await
}
pub async fn run_stdout(self) -> Result<String> {
let program = self.program.clone();
let output = self.run().await?;
if !output.success {
return Err(ConnectorError::Other(format!(
"Command '{program}' failed (exit {}): {}",
output.exit_code.unwrap_or(-1),
output.stderr.trim()
)));
}
Ok(output.stdout.trim().to_string())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_run_command_echo() {
let output = run_command("echo", &["hello", "world"]).await.unwrap();
assert!(output.success);
assert_eq!(output.stdout.trim(), "hello world");
assert_eq!(output.exit_code, Some(0));
}
#[tokio::test]
async fn test_run_command_nonexistent() {
let result = run_command("nonexistent_binary_xyz", &[]).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_run_command_exit_code() {
let output = run_command("sh", &["-c", "exit 42"]).await.unwrap();
assert!(!output.success);
assert_eq!(output.exit_code, Some(42));
}
#[tokio::test]
async fn test_run_command_stderr() {
let output = run_command("sh", &["-c", "echo error >&2"]).await.unwrap();
assert!(output.success);
assert_eq!(output.stderr.trim(), "error");
}
#[tokio::test]
async fn test_run_command_with_timeout_success() {
let output = run_command_with_timeout("echo", &["fast"], Duration::from_secs(5))
.await
.unwrap();
assert!(output.success);
assert_eq!(output.stdout.trim(), "fast");
}
#[tokio::test]
async fn test_run_command_with_timeout_exceeded() {
let result = run_command_with_timeout("sleep", &["10"], Duration::from_millis(100)).await;
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.to_string().contains("timed out"));
}
#[tokio::test]
async fn test_run_shell() {
let output = run_shell("echo $((2 + 3))").await.unwrap();
assert!(output.success);
assert_eq!(output.stdout.trim(), "5");
}
#[tokio::test]
async fn test_run_command_stdout_helper() {
let stdout = run_command_stdout("echo", &["hello"]).await.unwrap();
assert_eq!(stdout, "hello");
}
#[tokio::test]
async fn test_run_command_stdout_fails_on_error() {
let result = run_command_stdout("sh", &["-c", "echo fail >&2; exit 1"]).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_command_builder() {
let output = CommandBuilder::new("echo")
.args(&["hello", "builder"])
.timeout(Duration::from_secs(5))
.run()
.await
.unwrap();
assert!(output.success);
assert_eq!(output.stdout.trim(), "hello builder");
}
#[tokio::test]
async fn test_command_builder_with_env() {
let output = CommandBuilder::new("sh")
.args(&["-c", "echo $MY_VAR"])
.env("MY_VAR", "test_value")
.run()
.await
.unwrap();
assert!(output.success);
assert_eq!(output.stdout.trim(), "test_value");
}
#[tokio::test]
async fn test_command_with_stdin() {
let output = run_command_opts(
"cat",
&[],
CommandOptions {
stdin_data: Some(b"piped input".to_vec()),
..Default::default()
},
)
.await
.unwrap();
assert!(output.success);
assert_eq!(output.stdout, "piped input");
}
#[tokio::test]
async fn test_stdin_broken_pipe_tolerated() {
let output = run_command_opts(
"head",
&["-c", "1"],
CommandOptions {
stdin_data: Some(b"lots of data that head will not fully read".to_vec()),
..Default::default()
},
)
.await
.unwrap();
assert!(output.success);
}
#[tokio::test]
async fn test_concurrent_commands_no_thread_starvation() {
use std::time::Instant;
let start = Instant::now();
let mut handles = Vec::new();
for i in 0..20 {
handles.push(tokio::spawn(async move {
run_command_with_timeout("sleep", &["1"], Duration::from_secs(5))
.await
.unwrap();
i
}));
}
let mut results = Vec::new();
for handle in handles {
results.push(handle.await.unwrap());
}
let elapsed = start.elapsed();
assert_eq!(results.len(), 20);
assert!(
elapsed < Duration::from_secs(5),
"20 concurrent sleeps took {:?} — thread starvation detected",
elapsed
);
}
}