use std::{process::Stdio, time::Instant};
use crate::error::{CommandError, Error, Result};
use tokio::{
process::{Child, Command},
time::timeout,
};
use super::types::{
Command as CmdConfig, CommandOutput, CommandStream, DefaultCommandExecutor,
GlobalExecutorState, SharedSyncExecutor, StreamConfig, SyncCommandExecutor,
};
use crate::config::CommandConfig;
#[async_trait::async_trait]
pub trait Executor: Send + Sync {
async fn execute(&self, command: CmdConfig) -> Result<CommandOutput>;
async fn execute_stream(
&self,
command: CmdConfig,
stream_config: StreamConfig,
) -> Result<(CommandStream, Child)>;
}
impl DefaultCommandExecutor {
#[must_use]
pub fn new() -> Self {
Self { config: CommandConfig::default() }
}
#[must_use]
pub fn new_with_config(config: CommandConfig) -> Self {
Self { config }
}
#[must_use]
pub fn config(&self) -> &CommandConfig {
&self.config
}
fn build_command(config: &CmdConfig) -> Command {
#[cfg(target_os = "windows")]
let mut cmd = {
let mut cmd = Command::new("cmd");
cmd.arg("/C").arg(&config.program);
cmd.args(&config.args);
cmd
};
#[cfg(not(target_os = "windows"))]
let mut cmd = {
let mut cmd = Command::new(&config.program);
cmd.args(&config.args);
cmd
};
cmd.envs(&config.env).stdout(Stdio::piped()).stderr(Stdio::piped());
if let Some(dir) = &config.current_dir {
cmd.current_dir(dir);
}
cmd
}
}
#[async_trait::async_trait]
impl Executor for DefaultCommandExecutor {
async fn execute(&self, command: CmdConfig) -> Result<CommandOutput> {
let start_time = Instant::now();
let mut cmd = Self::build_command(&command);
let cmd_str = command.program.clone();
let timeout_duration = command.timeout.unwrap_or(self.config.default_timeout);
let child = cmd.spawn().map_err(|e| {
Error::Command(CommandError::SpawnFailed {
cmd: cmd_str.clone(),
message: e.to_string(),
})
})?;
let child_pid = child.id();
let output_result = timeout(timeout_duration, child.wait_with_output()).await;
let output = match output_result {
Ok(Ok(output)) => output,
Ok(Err(e)) => {
return Err(Error::Command(CommandError::ExecutionFailed {
cmd: cmd_str,
message: e.to_string(),
}));
}
Err(_) => {
if let Some(pid) = child_pid {
log::warn!(
"Process (PID: {pid}) timed out after {timeout_duration:?}. Manual cleanup might be required."
);
} else {
log::warn!(
"Process timed out after {timeout_duration:?}, but PID was unavailable."
);
}
return Err(Error::Command(CommandError::Timeout { duration: timeout_duration }));
}
};
let duration = start_time.elapsed();
let code = output.status.code();
let stdout = String::from_utf8_lossy(&output.stdout).into_owned();
let stderr = String::from_utf8_lossy(&output.stderr).into_owned();
if !output.status.success() {
return Err(Error::Command(CommandError::NonZeroExitCode {
cmd: cmd_str,
code,
stderr,
}));
}
Ok(CommandOutput::new(code.unwrap_or(0), stdout, stderr, duration))
}
async fn execute_stream(
&self,
command: CmdConfig,
stream_config: StreamConfig,
) -> Result<(CommandStream, Child)> {
let mut cmd = Self::build_command(&command);
let cmd_str = command.program.clone();
let mut child = cmd.spawn().map_err(|e| {
Error::Command(CommandError::SpawnFailed { cmd: cmd_str, message: e.to_string() })
})?;
let stdout = child
.stdout
.take()
.ok_or(Error::Command(CommandError::CaptureFailed { stream: "stdout".to_string() }))?;
let stderr = child
.stderr
.take()
.ok_or(Error::Command(CommandError::CaptureFailed { stream: "stderr".to_string() }))?;
let stream = CommandStream::new(stdout, stderr, &stream_config);
Ok((stream, child)) }
}
impl SyncCommandExecutor {
pub fn new() -> Result<Self> {
let runtime =
tokio::runtime::Builder::new_multi_thread().enable_all().build().map_err(|e| {
Error::Command(CommandError::Generic(format!(
"Failed to create runtime for sync executor: {e}"
)))
})?;
Ok(Self { runtime, executor: DefaultCommandExecutor::new() })
}
pub fn new_with_config(config: CommandConfig) -> Result<Self> {
let runtime =
tokio::runtime::Builder::new_multi_thread().enable_all().build().map_err(|e| {
Error::Command(CommandError::Generic(format!(
"Failed to create runtime for sync executor: {e}"
)))
})?;
Ok(Self { runtime, executor: DefaultCommandExecutor::new_with_config(config) })
}
pub fn execute_sync(&self, command: CmdConfig) -> Result<CommandOutput> {
self.runtime.block_on(self.executor.execute(command))
}
pub fn execute_sync_with_timeout(
&self,
command: CmdConfig,
timeout: std::time::Duration,
) -> Result<CommandOutput> {
self.runtime.block_on(async {
tokio::time::timeout(timeout, self.executor.execute(command))
.await
.map_err(|_| Error::Command(CommandError::Timeout { duration: timeout }))?
})
}
pub fn runtime_handle(&self) -> tokio::runtime::Handle {
self.runtime.handle().clone()
}
}
impl SharedSyncExecutor {
pub fn try_instance() -> Result<&'static SharedSyncExecutor> {
Self::try_instance_with_config(CommandConfig::default())
}
pub fn try_instance_with_config(config: CommandConfig) -> Result<&'static SharedSyncExecutor> {
use std::sync::{Arc, Mutex, OnceLock};
static GLOBAL_STATE: OnceLock<Mutex<GlobalExecutorState>> = OnceLock::new();
let state_lock =
GLOBAL_STATE.get_or_init(|| Mutex::new(GlobalExecutorState::Uninitialized));
let mut guard = state_lock.lock().map_err(|_| {
Error::Command(crate::error::CommandError::Generic(
"Failed to acquire lock for shared sync executor".to_string(),
))
})?;
match &*guard {
GlobalExecutorState::Success(executor) => Ok(executor),
GlobalExecutorState::Error(error) => Err(error.clone()),
GlobalExecutorState::Uninitialized => {
match SyncCommandExecutor::new_with_config(config) {
Ok(sync_executor) => {
let shared_executor =
SharedSyncExecutor { executor: Arc::new(sync_executor) };
*guard = GlobalExecutorState::Success(Box::leak(Box::new(shared_executor)));
if let GlobalExecutorState::Success(executor) = &*guard {
Ok(executor)
} else {
Err(Error::Command(crate::error::CommandError::Generic(
"Internal state corruption in shared sync executor".to_string(),
)))
}
}
Err(e) => {
*guard = GlobalExecutorState::Error(e.clone());
Err(e)
}
}
}
}
}
pub fn execute(&self, command: CmdConfig) -> Result<CommandOutput> {
self.executor.execute_sync(command)
}
pub fn execute_with_timeout(
&self,
command: CmdConfig,
timeout: std::time::Duration,
) -> Result<CommandOutput> {
self.executor.execute_sync_with_timeout(command, timeout)
}
}