use crate::cmd::cmd_error::CmdError;
use bytes::Bytes;
use log::{debug, error, trace, warn};
use std::process::Stdio;
use tokio::io::{AsyncReadExt, BufReader};
use tokio::process::{Child, ChildStdout, Command};
use tokio::sync::broadcast::Sender;
use tokio::sync::oneshot;
pub fn execute(
cmd: &str,
args: &[&str],
data_sender: Sender<Bytes>,
process_exit_sender: oneshot::Sender<()>,
read_buffer_size: usize,
) -> Result<Child, CmdError> {
debug!("command execute start: {} {}", cmd, args.join(" "));
let mut child = Command::new(cmd) .args(args) .stdout(Stdio::piped()) .stderr(Stdio::null()) .spawn() .map_err(CmdError::Execute)?; debug!("command execute started: {}", cmd);
let stdout = child.stdout.take().ok_or_else(CmdError::TakeStdout)?;
tokio::spawn(read_stdout(
stdout,
data_sender,
process_exit_sender,
read_buffer_size,
));
Ok(child)
}
async fn read_stdout(
stdout: ChildStdout,
data_sender: Sender<Bytes>,
process_exit_sender: oneshot::Sender<()>,
read_buffer_size: usize,
) {
let mut reader = BufReader::new(stdout);
let mut buffer = vec![0u8; read_buffer_size];
loop {
match reader.read(&mut buffer).await {
Ok(0) => {
debug!("command process stdout closed");
break;
}
Ok(n) => {
let receiver_count = data_sender.receiver_count();
if receiver_count > 0 {
trace!("command process receiver count: {}", receiver_count);
let data = Bytes::copy_from_slice(&buffer[..n]);
if let Err(e) = data_sender.send(data) {
warn!("Failed to send command process output to receiver: {:#}", e);
}
}
}
Err(e) => {
error!("read command process stdout error: {:#}", e);
break;
}
}
}
let _ = process_exit_sender.send(());
}
pub fn is_process_alive(child: &mut Child) -> Result<bool, CmdError> {
debug!(
"checking if process is alive: {}",
child.id().ok_or(CmdError::EmptyId)?
);
Ok(match child.try_wait() {
Ok(Some(_)) => false, Ok(None) => true, Err(_) => false, })
}
pub async fn kill_process(mut child: Child) -> Result<(), CmdError> {
debug!(
"killing process: {}",
child.id().ok_or_else(|| CmdError::EmptyId)?
);
Ok(child.kill().await.map_err(|e| {
error!("kill process fail: {:#}", e);
CmdError::Kill(e)
})?)
}