use hashbrown::HashMap;
use std::io::{self, ErrorKind};
use std::path::Path;
use std::process::Stdio;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::sync::atomic::AtomicBool;
use anyhow::{Context, Result};
use bytes::Bytes;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::process::Command;
use tokio::sync::{broadcast, mpsc, oneshot};
use tokio::task::JoinHandle;
use crate::process::{ChildTerminator, ProcessHandle, SpawnedProcess};
use crate::process_group;
struct PipeChildTerminator {
#[cfg(windows)]
pid: u32,
#[cfg(unix)]
process_group_id: u32,
}
impl ChildTerminator for PipeChildTerminator {
fn kill(&mut self) -> io::Result<()> {
#[cfg(unix)]
{
process_group::kill_process_group(self.process_group_id)
}
#[cfg(windows)]
{
process_group::kill_process(self.pid)
}
#[cfg(not(any(unix, windows)))]
{
Ok(())
}
}
}
async fn read_output_stream<R>(mut reader: R, output_tx: broadcast::Sender<Bytes>)
where
R: AsyncRead + Unpin,
{
let mut buf = vec![0u8; 8_192];
loop {
match reader.read(&mut buf).await {
Ok(0) => break,
Ok(n) => {
let _ = output_tx.send(Bytes::copy_from_slice(&buf[..n]));
}
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
Err(_) => break,
}
}
}
#[derive(Clone, Copy)]
pub enum PipeStdinMode {
Piped,
Null,
}
#[derive(Clone)]
pub struct PipeSpawnOptions {
pub program: String,
pub args: Vec<String>,
pub cwd: std::path::PathBuf,
pub env: Option<HashMap<String, String>>,
pub arg0: Option<String>,
pub stdin_mode: PipeStdinMode,
}
impl PipeSpawnOptions {
pub fn new(program: impl Into<String>, cwd: impl Into<std::path::PathBuf>) -> Self {
Self {
program: program.into(),
args: Vec::new(),
cwd: cwd.into(),
env: None,
arg0: None,
stdin_mode: PipeStdinMode::Piped,
}
}
pub fn args(mut self, args: impl IntoIterator<Item = impl Into<String>>) -> Self {
self.args = args.into_iter().map(Into::into).collect();
self
}
pub fn env(mut self, env: HashMap<String, String>) -> Self {
self.env = Some(env);
self
}
pub fn arg0(mut self, arg0: impl Into<String>) -> Self {
self.arg0 = Some(arg0.into());
self
}
pub fn stdin_mode(mut self, mode: PipeStdinMode) -> Self {
self.stdin_mode = mode;
self
}
}
async fn spawn_process_internal(opts: PipeSpawnOptions) -> Result<SpawnedProcess> {
if opts.program.is_empty() {
anyhow::bail!("missing program for pipe spawn");
}
let mut command = Command::new(&opts.program);
#[cfg(unix)]
if let Some(ref arg0) = opts.arg0 {
command.arg0(arg0);
}
#[cfg(unix)]
{
command.process_group(0);
}
#[cfg(not(unix))]
let _ = &opts.arg0;
command.current_dir(&opts.cwd);
if let Some(ref env) = opts.env {
command.env_clear();
for (key, value) in env {
command.env(key, value);
}
}
for arg in &opts.args {
command.arg(arg);
}
match opts.stdin_mode {
PipeStdinMode::Piped => {
command.stdin(Stdio::piped());
}
PipeStdinMode::Null => {
command.stdin(Stdio::null());
}
}
command.stdout(Stdio::piped());
command.stderr(Stdio::piped());
let mut child = command.spawn().context("failed to spawn pipe process")?;
let pid = child
.id()
.ok_or_else(|| io::Error::other("missing child pid"))?;
#[cfg(unix)]
let process_group_id = pid;
let stdin = child.stdin.take();
let stdout = child.stdout.take();
let stderr = child.stderr.take();
let (writer_tx, mut writer_rx) = mpsc::channel::<Vec<u8>>(128);
let (output_tx, _) = broadcast::channel::<Bytes>(256);
let initial_output_rx = output_tx.subscribe();
let writer_handle = if let Some(stdin) = stdin {
let writer = Arc::new(tokio::sync::Mutex::new(stdin));
tokio::spawn(async move {
while let Some(bytes) = writer_rx.recv().await {
let mut guard = writer.lock().await;
let _ = guard.write_all(&bytes).await;
let _ = guard.flush().await;
}
})
} else {
drop(writer_rx);
tokio::spawn(async {})
};
let stdout_handle = stdout.map(|stdout| {
let output_tx = output_tx.clone();
tokio::spawn(async move {
read_output_stream(BufReader::new(stdout), output_tx).await;
})
});
let stderr_handle = stderr.map(|stderr| {
let output_tx = output_tx.clone();
tokio::spawn(async move {
read_output_stream(BufReader::new(stderr), output_tx).await;
})
});
let mut reader_abort_handles = Vec::new();
if let Some(ref handle) = stdout_handle {
reader_abort_handles.push(handle.abort_handle());
}
if let Some(ref handle) = stderr_handle {
reader_abort_handles.push(handle.abort_handle());
}
let reader_handle = tokio::spawn(async move {
if let Some(handle) = stdout_handle {
let _ = handle.await;
}
if let Some(handle) = stderr_handle {
let _ = handle.await;
}
});
let (exit_tx, exit_rx) = oneshot::channel::<i32>();
let exit_status = Arc::new(AtomicBool::new(false));
let wait_exit_status = Arc::clone(&exit_status);
let exit_code = Arc::new(StdMutex::new(None));
let wait_exit_code = Arc::clone(&exit_code);
let wait_handle: JoinHandle<()> = tokio::spawn(async move {
let code = match child.wait().await {
Ok(status) => status.code().unwrap_or(-1),
Err(_) => -1,
};
wait_exit_status.store(true, std::sync::atomic::Ordering::SeqCst);
if let Ok(mut guard) = wait_exit_code.lock() {
*guard = Some(code);
}
let _ = exit_tx.send(code);
});
let (handle, output_rx) = ProcessHandle::new(
writer_tx,
output_tx,
initial_output_rx,
Box::new(PipeChildTerminator {
#[cfg(windows)]
pid,
#[cfg(unix)]
process_group_id,
}),
reader_handle,
reader_abort_handles,
writer_handle,
wait_handle,
exit_status,
exit_code,
None,
);
Ok(SpawnedProcess {
session: handle,
output_rx,
exit_rx,
})
}
pub async fn spawn_process(
program: &str,
args: &[String],
cwd: &Path,
env: &HashMap<String, String>,
arg0: &Option<String>,
) -> Result<SpawnedProcess> {
let opts = PipeSpawnOptions {
program: program.to_string(),
args: args.to_vec(),
cwd: cwd.to_path_buf(),
env: Some(env.clone()),
arg0: arg0.clone(),
stdin_mode: PipeStdinMode::Piped,
};
spawn_process_internal(opts).await
}
pub async fn spawn_process_no_stdin(
program: &str,
args: &[String],
cwd: &Path,
env: &HashMap<String, String>,
arg0: &Option<String>,
) -> Result<SpawnedProcess> {
let opts = PipeSpawnOptions {
program: program.to_string(),
args: args.to_vec(),
cwd: cwd.to_path_buf(),
env: Some(env.clone()),
arg0: arg0.clone(),
stdin_mode: PipeStdinMode::Null,
};
spawn_process_internal(opts).await
}
pub async fn spawn_process_with_options(opts: PipeSpawnOptions) -> Result<SpawnedProcess> {
spawn_process_internal(opts).await
}
#[cfg(test)]
mod tests {
use super::*;
fn find_echo_command() -> Option<(String, Vec<String>)> {
#[cfg(windows)]
{
Some((
"cmd.exe".to_string(),
vec!["/C".to_string(), "echo".to_string()],
))
}
#[cfg(not(windows))]
{
Some(("echo".to_string(), vec![]))
}
}
#[tokio::test]
async fn test_spawn_process_echo() -> Result<()> {
let Some((program, mut base_args)) = find_echo_command() else {
return Ok(());
};
base_args.push("hello".to_string());
let env: HashMap<String, String> = std::env::vars().collect();
let spawned = spawn_process(&program, &base_args, Path::new("."), &env, &None).await?;
let exit_code = spawned.exit_rx.await.unwrap_or(-1);
assert_eq!(exit_code, 0);
Ok(())
}
#[tokio::test]
async fn test_spawn_options_builder() {
let opts = PipeSpawnOptions::new("echo", ".")
.args(["hello", "world"])
.stdin_mode(PipeStdinMode::Null);
assert_eq!(opts.program, "echo");
assert_eq!(opts.args, vec!["hello", "world"]);
assert!(matches!(opts.stdin_mode, PipeStdinMode::Null));
}
}