use crate::config::Job;
use crate::error::{Error, Result};
use std::process::Stdio;
use std::sync::RwLock;
use tokio::io::AsyncReadExt;
use tokio::process::Command;
use tokio::time::{timeout, Duration};
use tracing::{debug, instrument, warn};
pub struct JobExecutor {
shell: RwLock<String>,
shell_args: RwLock<Vec<String>>,
}
impl JobExecutor {
pub fn new(shell: String, shell_args: Vec<String>) -> Self {
Self {
shell: RwLock::new(shell),
shell_args: RwLock::new(shell_args),
}
}
pub fn update_shell(&self, shell: String, shell_args: Vec<String>) {
*self.shell.write().unwrap() = shell;
*self.shell_args.write().unwrap() = shell_args;
}
#[instrument(skip(self, job), fields(job_name = %job_name))]
pub async fn execute(&self, job_name: &str, job: &Job) -> Result<(i32, String, String)> {
let (shell, shell_args) = self.get_shell(job);
debug!(
command = %job.command,
shell = %shell,
"Executing job"
);
let mut cmd = Command::new(&shell);
for arg in &shell_args {
cmd.arg(arg);
}
cmd.arg(&job.command);
if let Some(ref dir) = job.working_dir {
cmd.current_dir(dir);
}
for (key, value) in &job.environment {
cmd.env(key, value);
}
cmd.stdin(Stdio::null());
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
cmd.kill_on_drop(true);
let mut child = cmd.spawn().map_err(|e| Error::JobSpawn {
job_name: job_name.to_string(),
source: e,
})?;
let mut stdout_handle = child.stdout.take().expect("stdout was piped");
let mut stderr_handle = child.stderr.take().expect("stderr was piped");
let max_output = job.max_output_size;
let read_stdout = async move {
let mut stdout = Vec::with_capacity(4096.min(max_output));
let mut buf = vec![0u8; 8192];
loop {
match stdout_handle.read(&mut buf).await {
Ok(0) => break,
Ok(n) => {
if stdout.len() < max_output {
let remaining = max_output - stdout.len();
stdout.extend_from_slice(&buf[..n.min(remaining)]);
}
}
Err(_) => break,
}
}
String::from_utf8_lossy(&stdout).into_owned()
};
let read_stderr = async move {
let mut stderr = Vec::with_capacity(1024.min(max_output));
let mut buf = vec![0u8; 8192];
loop {
match stderr_handle.read(&mut buf).await {
Ok(0) => break,
Ok(n) => {
if stderr.len() < max_output {
let remaining = max_output - stderr.len();
stderr.extend_from_slice(&buf[..n.min(remaining)]);
}
}
Err(_) => break,
}
}
String::from_utf8_lossy(&stderr).into_owned()
};
let read_output = async { tokio::join!(read_stdout, read_stderr) };
let result = if job.has_timeout() {
let duration = Duration::from_secs(job.timeout);
match timeout(duration, async {
let output = read_output.await;
let status = child.wait().await;
(status, output)
})
.await
{
Ok((status, output)) => {
let exit_code = status
.map_err(|e| Error::JobSpawn {
job_name: job_name.to_string(),
source: e,
})?
.code()
.unwrap_or(-1);
Ok((exit_code, output.0, output.1))
}
Err(_) => {
warn!(job = %job_name, timeout = %job.timeout, "Job timed out");
Err(Error::job_timeout(job_name, job.timeout))
}
}
} else {
let output = read_output.await;
let status = child.wait().await.map_err(|e| Error::JobSpawn {
job_name: job_name.to_string(),
source: e,
})?;
let exit_code = status.code().unwrap_or(-1);
Ok((exit_code, output.0, output.1))
};
result
}
fn get_shell(&self, job: &Job) -> (String, Vec<String>) {
if let Some(ref shell) = job.shell {
if cfg!(windows) {
(shell.clone(), vec!["/C".to_string()])
} else {
(shell.clone(), vec!["-c".to_string()])
}
} else {
(
self.shell.read().unwrap().clone(),
self.shell_args.read().unwrap().clone(),
)
}
}
}
impl Default for JobExecutor {
fn default() -> Self {
if cfg!(windows) {
Self::new("cmd".to_string(), vec!["/C".to_string()])
} else {
Self::new("/bin/sh".to_string(), vec!["-c".to_string()])
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_job(command: &str) -> Job {
Job {
command: command.to_string(),
..Default::default()
}
}
#[tokio::test]
async fn test_execute_simple_command() {
let executor = JobExecutor::default();
let cmd = if cfg!(windows) {
"echo hello"
} else {
"echo hello"
};
let job = test_job(cmd);
let result = executor.execute("test", &job).await;
assert!(result.is_ok());
let (exit_code, stdout, _) = result.unwrap();
assert_eq!(exit_code, 0);
assert!(stdout.contains("hello"));
}
#[tokio::test]
async fn test_execute_failing_command() {
let executor = JobExecutor::default();
let cmd = if cfg!(windows) { "exit 1" } else { "exit 1" };
let job = test_job(cmd);
let result = executor.execute("test", &job).await;
assert!(result.is_ok());
let (exit_code, _, _) = result.unwrap();
assert_ne!(exit_code, 0);
}
#[tokio::test]
async fn test_execute_with_timeout() {
let executor = JobExecutor::default();
let cmd = if cfg!(windows) {
"ping -n 10 127.0.0.1"
} else {
"sleep 10"
};
let job = Job {
command: cmd.to_string(),
timeout: 1, ..Default::default()
};
let result = executor.execute("test", &job).await;
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), Error::JobTimeout { .. }));
}
#[tokio::test]
async fn test_execute_with_env() {
let executor = JobExecutor::default();
let cmd = if cfg!(windows) {
"echo %TEST_VAR%"
} else {
"echo $TEST_VAR"
};
let mut job = test_job(cmd);
job.environment
.insert("TEST_VAR".to_string(), "test_value".to_string());
let result = executor.execute("test", &job).await;
assert!(result.is_ok());
let (exit_code, stdout, _) = result.unwrap();
assert_eq!(exit_code, 0);
assert!(stdout.contains("test_value"));
}
#[tokio::test]
async fn test_output_capture() {
let executor = JobExecutor::default();
let cmd = if cfg!(windows) {
"echo stdout_test && echo stderr_test 1>&2"
} else {
"echo stdout_test && echo stderr_test >&2"
};
let job = test_job(cmd);
let result = executor.execute("test", &job).await;
assert!(result.is_ok());
let (_, stdout, stderr) = result.unwrap();
assert!(stdout.contains("stdout_test"));
assert!(stderr.contains("stderr_test"));
}
}