#![allow(clippy::tabs_in_doc_comments)]
#[cfg(unix)]
use std::os::unix::process::ExitStatusExt;
#[cfg(windows)]
use std::os::windows::process::CommandExt;
use std::{
collections::HashMap,
io::{BufReader, Write},
path::PathBuf,
process::{Command as StdCommand, Stdio},
sync::{Arc, Mutex, RwLock},
thread::spawn
};
#[cfg(windows)]
const CREATE_NO_WINDOW: u32 = 0x0800_0000;
use millennium_utils::platform;
use os_pipe::{pipe, PipeReader, PipeWriter};
use serde::Serialize;
use shared_child::SharedChild;
use crate::async_runtime::{block_on as block_on_task, channel, Receiver, Sender};
type ChildStore = Arc<Mutex<HashMap<u32, Arc<SharedChild>>>>;
fn commands() -> &'static ChildStore {
use once_cell::sync::Lazy;
static STORE: Lazy<ChildStore> = Lazy::new(Default::default);
&STORE
}
pub fn kill_children() {
let commands = commands().lock().unwrap();
let children = commands.values();
for child in children {
let _ = child.kill();
}
}
#[derive(Debug, Clone, Serialize)]
pub struct TerminatedPayload {
pub code: Option<i32>,
pub signal: Option<i32>
}
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "event", content = "payload")]
#[non_exhaustive]
pub enum CommandEvent {
Stderr(String),
Stdout(String),
Error(String),
Terminated(TerminatedPayload)
}
macro_rules! get_std_command {
($self: ident) => {{
let mut command = StdCommand::new($self.program);
command.args(&$self.args);
command.stdout(Stdio::piped());
command.stdin(Stdio::piped());
command.stderr(Stdio::piped());
if $self.env_clear {
command.env_clear();
}
command.envs($self.env);
if let Some(current_dir) = $self.current_dir {
command.current_dir(current_dir);
}
#[cfg(windows)]
command.creation_flags(CREATE_NO_WINDOW);
command
}};
}
#[derive(Debug)]
pub struct Command {
program: String,
args: Vec<String>,
env_clear: bool,
env: HashMap<String, String>,
current_dir: Option<PathBuf>
}
#[derive(Debug)]
pub struct CommandChild {
inner: Arc<SharedChild>,
stdin_writer: PipeWriter
}
impl CommandChild {
pub fn write(&mut self, buf: &[u8]) -> crate::api::Result<()> {
self.stdin_writer.write_all(buf)?;
Ok(())
}
pub fn kill(self) -> crate::api::Result<()> {
self.inner.kill()?;
Ok(())
}
pub fn pid(&self) -> u32 {
self.inner.id()
}
}
#[derive(Debug)]
pub struct ExitStatus {
code: Option<i32>
}
impl ExitStatus {
pub fn code(&self) -> Option<i32> {
self.code
}
pub fn success(&self) -> bool {
self.code == Some(0)
}
}
#[derive(Debug)]
pub struct Output {
pub status: ExitStatus,
pub stdout: String,
pub stderr: String
}
fn relative_command_path(command: String) -> crate::Result<String> {
match platform::current_exe()?.parent() {
#[cfg(windows)]
Some(exe_dir) => Ok(format!("{}\\{}.exe", exe_dir.display(), command)),
#[cfg(not(windows))]
Some(exe_dir) => Ok(format!("{}/{}", exe_dir.display(), command)),
None => Err(crate::api::Error::Command("Could not evaluate executable dir".to_string()).into())
}
}
impl Command {
pub fn new<S: Into<String>>(program: S) -> Self {
Self {
program: program.into(),
args: Default::default(),
env_clear: false,
env: Default::default(),
current_dir: None
}
}
pub fn new_sidecar<S: Into<String>>(program: S) -> crate::Result<Self> {
Ok(Self::new(relative_command_path(program.into())?))
}
#[must_use]
pub fn args<I, S>(mut self, args: I) -> Self
where
I: IntoIterator<Item = S>,
S: AsRef<str>
{
for arg in args {
self.args.push(arg.as_ref().to_string());
}
self
}
#[must_use]
pub fn env_clear(mut self) -> Self {
self.env_clear = true;
self
}
#[must_use]
pub fn envs(mut self, env: HashMap<String, String>) -> Self {
self.env = env;
self
}
#[must_use]
pub fn current_dir(mut self, current_dir: PathBuf) -> Self {
self.current_dir.replace(current_dir);
self
}
pub fn spawn(self) -> crate::api::Result<(Receiver<CommandEvent>, CommandChild)> {
let mut command = get_std_command!(self);
let (stdout_reader, stdout_writer) = pipe()?;
let (stderr_reader, stderr_writer) = pipe()?;
let (stdin_reader, stdin_writer) = pipe()?;
command.stdout(stdout_writer);
command.stderr(stderr_writer);
command.stdin(stdin_reader);
let shared_child = SharedChild::spawn(&mut command)?;
let child = Arc::new(shared_child);
let child_ = child.clone();
let guard = Arc::new(RwLock::new(()));
commands().lock().unwrap().insert(child.id(), child.clone());
let (tx, rx) = channel(1);
spawn_pipe_reader(tx.clone(), guard.clone(), stdout_reader, CommandEvent::Stdout);
spawn_pipe_reader(tx.clone(), guard.clone(), stderr_reader, CommandEvent::Stderr);
spawn(move || {
let _ = match child_.wait() {
Ok(status) => {
let _l = guard.write().unwrap();
commands().lock().unwrap().remove(&child_.id());
block_on_task(async move {
tx.send(CommandEvent::Terminated(TerminatedPayload {
code: status.code(),
#[cfg(windows)]
signal: None,
#[cfg(unix)]
signal: status.signal()
}))
.await
})
}
Err(e) => {
let _l = guard.write().unwrap();
block_on_task(async move { tx.send(CommandEvent::Error(e.to_string())).await })
}
};
});
Ok((rx, CommandChild { inner: child, stdin_writer }))
}
pub fn status(self) -> crate::api::Result<ExitStatus> {
let (mut rx, _child) = self.spawn()?;
let code = crate::async_runtime::safe_block_on(async move {
let mut code = None;
#[allow(clippy::collapsible_match)]
while let Some(event) = rx.recv().await {
if let CommandEvent::Terminated(payload) = event {
code = payload.code;
}
}
code
});
Ok(ExitStatus { code })
}
pub fn output(self) -> crate::api::Result<Output> {
let (mut rx, _child) = self.spawn()?;
let output = crate::async_runtime::safe_block_on(async move {
let mut code = None;
let mut stdout = String::new();
let mut stderr = String::new();
while let Some(event) = rx.recv().await {
match event {
CommandEvent::Terminated(payload) => {
code = payload.code;
}
CommandEvent::Stdout(line) => {
stdout.push_str(line.as_str());
stdout.push('\n');
}
CommandEvent::Stderr(line) => {
stderr.push_str(line.as_str());
stderr.push('\n');
}
CommandEvent::Error(_) => {}
}
}
Output {
status: ExitStatus { code },
stdout,
stderr
}
});
Ok(output)
}
}
fn spawn_pipe_reader<F: Fn(String) -> CommandEvent + Send + Copy + 'static>(
tx: Sender<CommandEvent>,
guard: Arc<RwLock<()>>,
pipe_reader: PipeReader,
wrapper: F
) {
spawn(move || {
let _lock = guard.read().unwrap();
let mut reader = BufReader::new(pipe_reader);
let mut buf = Vec::new();
loop {
buf.clear();
match millennium_utils::io::read_line(&mut reader, &mut buf) {
Ok(n) => {
if n == 0 {
break;
}
let tx_ = tx.clone();
let line = String::from_utf8(buf.clone());
block_on_task(async move {
let _ = match line {
Ok(line) => tx_.send(wrapper(line)).await,
Err(e) => tx_.send(CommandEvent::Error(e.to_string())).await
};
});
}
Err(e) => {
let tx_ = tx.clone();
let _ = block_on_task(async move { tx_.send(CommandEvent::Error(e.to_string())).await });
}
}
}
});
}
#[cfg(test)]
mod test {
#[cfg(not(windows))]
use super::*;
#[cfg(not(windows))]
#[test]
fn test_cmd_output() {
let cmd = Command::new("cat").args(&["test/api/test.txt"]);
let (mut rx, _) = cmd.spawn().unwrap();
crate::async_runtime::block_on(async move {
while let Some(event) = rx.recv().await {
match event {
CommandEvent::Terminated(payload) => {
assert_eq!(payload.code, Some(0));
}
CommandEvent::Stdout(line) => {
assert_eq!(line, "This is a test doc!".to_string());
}
_ => {}
}
}
});
}
#[cfg(not(windows))]
#[test]
fn test_cmd_fail() {
let cmd = Command::new("cat").args(&["test/api/"]);
let (mut rx, _) = cmd.spawn().unwrap();
crate::async_runtime::block_on(async move {
while let Some(event) = rx.recv().await {
match event {
CommandEvent::Terminated(payload) => {
assert_eq!(payload.code, Some(1));
}
CommandEvent::Stderr(line) => {
assert_eq!(line, "cat: test/api/: Is a directory".to_string());
}
_ => {}
}
}
});
}
}