use std::{ffi::OsStr, path::PathBuf, process::Stdio};
use anyhow::{anyhow, Context, Error};
use async_trait::async_trait;
use ezexec::lookup::Shell;
use log::{debug, info};
use tokio::fs::OpenOptions;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
use tokio::process::Command;
pub struct Executor {
shell: Shell,
}
impl Executor {
pub fn try_new() -> Result<Self, Error> {
let shell = ezexec::lookup::Shell::find()
.map_err(|e| anyhow!("Could not find a shell to execute command: {}", e))?;
Ok(Self { shell })
}
pub fn command<P>(&self, cmd: P) -> Result<Command, Error>
where
P: AsRef<str>,
{
let shell: &OsStr = self.shell.as_ref();
let mut command = Command::new(shell);
let cmd = cmd.as_ref();
let execstring_args = self
.shell
.execstring_args()
.map_err(|e| anyhow!("Could not find a shell string: {}", e))?;
let args = execstring_args.iter().chain(std::iter::once(&cmd));
command.args(args);
command.stdout(Stdio::piped());
command.stderr(Stdio::piped());
Ok(command)
}
}
fn spawn_logger<T>(name: &'static str, reader: BufReader<T>, dest: PathBuf, command: &str)
where
BufReader<T>: AsyncBufReadExt + Unpin,
T: 'static + Send,
{
debug!("Storing {} logs in {:?}", name, dest);
let command = format!("\ncommand: {}\n", command.to_string());
tokio::task::spawn(async move {
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(dest)
.await
.with_context(|| format!("Could not create log file {}", name))?;
{
let mut writer = BufWriter::new(&mut file);
writer
.write_all(command.as_bytes())
.await
.with_context(|| format!("Could not write log file {}", name))?;
writer
.flush()
.await
.with_context(|| format!("Could not write log file {}", name))?;
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
info!("{}: {}", name, line);
writer
.write_all(line.as_bytes())
.await
.with_context(|| format!("Could not write log file {}", name))?;
writer
.write_all(b"\n")
.await
.with_context(|| format!("Could not write log file {}", name))?;
writer
.flush()
.await
.with_context(|| format!("Could not write log file {}", name))?;
}
}
let _ = file.sync_data().await;
Ok(()) as Result<(), anyhow::Error>
});
}
#[async_trait]
pub trait CommandExt {
async fn spawn_logged(
&mut self,
log_dir: &PathBuf,
name: &'static str,
line: &str,
) -> Result<(), Error>;
}
#[async_trait]
impl CommandExt for Command {
async fn spawn_logged(
&mut self,
log_dir: &PathBuf,
name: &'static str,
line: &str,
) -> Result<(), Error> {
let mut child = self
.spawn()
.with_context(|| format!("Could not spawn process for `{}`", name))?;
if let Some(stdout) = child.stdout.take() {
let reader = BufReader::new(stdout);
let log_path = log_dir.join(format!("{name}.out", name = name));
spawn_logger(name, reader, log_path, line);
}
if let Some(stderr) = child.stderr.take() {
let reader = BufReader::new(stderr);
let log_path = log_dir.join(format!("{name}.log", name = name));
spawn_logger(name, reader, log_path, line);
}
let status = child.wait().await.context("Child process not launched")?;
if status.success() {
return Ok(());
}
Err(anyhow!("Child `{}` failed: `{}`", name, status))
}
}