use std::process::{ExitStatus, Output};
use async_trait::async_trait;
use log::error;
use time::OffsetDateTime;
use tokio::{
process::Command,
sync::oneshot::{channel, Receiver, Sender},
};
use crate::error::Error;
#[async_trait]
pub trait ProcessMonitor {
async fn start(&self, mut cmd: Command, tx: Sender<Exit>) -> std::io::Result<Output> {
let chi = cmd.spawn()?;
let pid = chi
.id()
.expect("failed to take pid of the container process.");
let out = chi.wait_with_output().await?;
let ts = OffsetDateTime::now_utc();
let status = out.status.code().unwrap_or(-1);
match tx.send(Exit { ts, pid, status }) {
Ok(_) => Ok(out),
Err(e) => {
error!("command {:?} exited but receiver dropped.", cmd);
error!("couldn't send messages: {:?}", e);
Err(std::io::ErrorKind::ConnectionRefused.into())
}
}
}
async fn wait(&self, rx: Receiver<Exit>) -> std::io::Result<Exit> {
rx.await.map_err(|_| {
error!("sender dropped.");
std::io::ErrorKind::BrokenPipe.into()
})
}
}
#[derive(Debug, Clone, Default)]
pub struct DefaultMonitor {}
impl ProcessMonitor for DefaultMonitor {}
impl DefaultMonitor {
pub const fn new() -> Self {
Self {}
}
}
#[derive(Debug)]
pub struct Exit {
pub ts: OffsetDateTime,
pub pid: u32,
pub status: i32,
}
pub struct ExecuteResult {
pub exit: Exit,
pub status: ExitStatus,
pub stdout: String,
pub stderr: String,
}
pub async fn execute<T: ProcessMonitor + Send + Sync>(
monitor: &T,
cmd: Command,
) -> Result<ExecuteResult, Error> {
let (tx, rx) = channel::<Exit>();
let start = monitor.start(cmd, tx);
let wait = monitor.wait(rx);
let (
Output {
stdout,
stderr,
status,
},
exit,
) = tokio::try_join!(start, wait).map_err(Error::InvalidCommand)?;
let stdout = String::from_utf8_lossy(&stdout).to_string();
let stderr = String::from_utf8_lossy(&stderr).to_string();
Ok(ExecuteResult {
exit,
status,
stdout,
stderr,
})
}
#[cfg(test)]
mod tests {
use std::process::Stdio;
use tokio::{process::Command, sync::oneshot::channel};
use super::*;
#[tokio::test]
async fn test_start_wait_without_output() {
let monitor = DefaultMonitor::new();
let cmd = Command::new("/bin/ls");
let (tx, rx) = channel();
let output = monitor.start(cmd, tx).await.unwrap();
assert_eq!(output.stdout.len(), 0);
assert_eq!(output.stderr.len(), 0);
let status = monitor.wait(rx).await.unwrap();
assert_eq!(status.status, 0);
}
#[tokio::test]
async fn test_start_wait_with_output() {
let monitor = DefaultMonitor::new();
let mut cmd = Command::new("/bin/ls");
cmd.stdout(Stdio::piped());
let (tx, rx) = channel();
let output = monitor.start(cmd, tx).await.unwrap();
assert!(!output.stdout.is_empty());
assert_eq!(output.stderr.len(), 0);
let status = monitor.wait(rx).await.unwrap();
assert_eq!(status.status, 0);
}
#[tokio::test]
async fn test_execute() {
let mut cmd = Command::new("/bin/ls");
cmd.stdout(Stdio::piped());
let monitor = DefaultMonitor::new();
let result = execute(&monitor, cmd).await.unwrap();
assert_eq!(result.exit.status, 0);
assert!(result.status.success());
assert!(!result.stdout.is_empty());
assert_eq!(result.stderr.len(), 0);
}
}