use crate::error::SpawnError;
use crate::output_stream::broadcast::BroadcastOutputStream;
use crate::output_stream::single_subscriber::SingleSubscriberOutputStream;
use crate::output_stream::{BackpressureControl, DEFAULT_CHANNEL_CAPACITY, DEFAULT_CHUNK_SIZE};
use crate::process_handle::SingleSubscriberStreamConfig;
use crate::{NumBytes, ProcessHandle};
use std::borrow::Cow;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AutoName {
Using(AutoNameSettings),
Debug,
}
impl Default for AutoName {
fn default() -> Self {
Self::Using(AutoNameSettings::program_with_args())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[expect(
clippy::struct_excessive_bools,
reason = "each flag controls one optional part of the generated process name"
)]
pub struct AutoNameSettings {
include_current_dir: bool,
include_envs: bool,
include_program: bool,
include_args: bool,
}
impl AutoNameSettings {
#[must_use]
pub fn program_only() -> Self {
AutoNameSettings {
include_current_dir: false,
include_envs: false,
include_program: true,
include_args: false,
}
}
#[must_use]
pub fn program_with_args() -> Self {
AutoNameSettings {
include_current_dir: false,
include_envs: false,
include_program: true,
include_args: true,
}
}
#[must_use]
pub fn program_with_env_and_args() -> Self {
AutoNameSettings {
include_current_dir: false,
include_envs: true,
include_program: true,
include_args: true,
}
}
#[must_use]
pub fn full() -> Self {
AutoNameSettings {
include_current_dir: true,
include_envs: true,
include_program: true,
include_args: true,
}
}
fn format_cmd(self, cmd: &std::process::Command) -> String {
let mut name = String::new();
if self.include_current_dir
&& let Some(current_dir) = cmd.get_current_dir()
{
name.push_str(current_dir.to_string_lossy().as_ref());
name.push_str(" % ");
}
if self.include_envs {
let envs = cmd.get_envs();
if envs.len() != 0 {
for (key, value) in envs
.filter(|(_key, value)| value.is_some())
.map(|(key, value)| (key, value.expect("present")))
{
name.push_str(key.to_string_lossy().as_ref());
name.push('=');
name.push_str(value.to_string_lossy().as_ref());
name.push(' ');
}
}
}
if self.include_program {
name.push_str(cmd.get_program().to_string_lossy().as_ref());
name.push(' ');
}
if self.include_args {
let args = cmd.get_args();
if args.len() != 0 {
for arg in args {
name.push('"');
name.push_str(arg.to_string_lossy().as_ref());
name.push('"');
name.push(' ');
}
}
}
if name.ends_with(' ') {
name.pop();
}
name
}
}
#[derive(Debug, Clone)]
pub enum ProcessName {
Explicit(Cow<'static, str>),
Auto(AutoName),
}
impl Default for ProcessName {
fn default() -> Self {
Self::Auto(AutoName::default())
}
}
impl From<&'static str> for ProcessName {
fn from(s: &'static str) -> Self {
Self::Explicit(Cow::Borrowed(s))
}
}
impl From<String> for ProcessName {
fn from(s: String) -> Self {
Self::Explicit(Cow::Owned(s))
}
}
impl From<Cow<'static, str>> for ProcessName {
fn from(s: Cow<'static, str>) -> Self {
Self::Explicit(s)
}
}
impl From<AutoName> for ProcessName {
fn from(mode: AutoName) -> Self {
Self::Auto(mode)
}
}
pub struct Process {
cmd: tokio::process::Command,
name: ProcessName,
stdout_chunk_size: NumBytes,
stderr_chunk_size: NumBytes,
stdout_capacity: usize,
stderr_capacity: usize,
stdout_backpressure_control: BackpressureControl,
stderr_backpressure_control: BackpressureControl,
}
impl Process {
#[must_use]
pub fn new(cmd: tokio::process::Command) -> Self {
Self {
cmd,
name: ProcessName::default(),
stdout_chunk_size: DEFAULT_CHUNK_SIZE,
stderr_chunk_size: DEFAULT_CHUNK_SIZE,
stdout_capacity: DEFAULT_CHANNEL_CAPACITY,
stderr_capacity: DEFAULT_CHANNEL_CAPACITY,
stdout_backpressure_control: BackpressureControl::DropLatestIncomingIfBufferFull,
stderr_backpressure_control: BackpressureControl::DropLatestIncomingIfBufferFull,
}
}
#[must_use]
pub fn name(mut self, name: impl Into<ProcessName>) -> Self {
self.name = name.into();
self
}
#[must_use]
pub fn with_name(self, name: impl Into<Cow<'static, str>>) -> Self {
self.name(ProcessName::Explicit(name.into()))
}
#[must_use]
pub fn with_auto_name(self, mode: AutoName) -> Self {
self.name(ProcessName::Auto(mode))
}
#[must_use]
pub fn stdout_chunk_size(mut self, chunk_size: NumBytes) -> Self {
chunk_size.assert_non_zero("chunk_size");
self.stdout_chunk_size = chunk_size;
self
}
#[must_use]
pub fn stderr_chunk_size(mut self, chunk_size: NumBytes) -> Self {
chunk_size.assert_non_zero("chunk_size");
self.stderr_chunk_size = chunk_size;
self
}
#[must_use]
pub fn chunk_sizes(mut self, chunk_size: NumBytes) -> Self {
chunk_size.assert_non_zero("chunk_size");
self.stdout_chunk_size = chunk_size;
self.stderr_chunk_size = chunk_size;
self
}
#[must_use]
pub fn stdout_capacity(mut self, capacity: usize) -> Self {
self.stdout_capacity = capacity;
self
}
#[must_use]
pub fn stderr_capacity(mut self, capacity: usize) -> Self {
self.stderr_capacity = capacity;
self
}
#[must_use]
pub fn capacities(mut self, capacity: usize) -> Self {
self.stdout_capacity = capacity;
self.stderr_capacity = capacity;
self
}
#[must_use]
pub fn stdout_backpressure_control(
mut self,
backpressure_control: BackpressureControl,
) -> Self {
self.stdout_backpressure_control = backpressure_control;
self
}
#[must_use]
pub fn stderr_backpressure_control(
mut self,
backpressure_control: BackpressureControl,
) -> Self {
self.stderr_backpressure_control = backpressure_control;
self
}
#[must_use]
pub fn backpressure_control(mut self, backpressure_control: BackpressureControl) -> Self {
self.stdout_backpressure_control = backpressure_control;
self.stderr_backpressure_control = backpressure_control;
self
}
fn generate_name(&self) -> Cow<'static, str> {
match &self.name {
ProcessName::Explicit(name) => name.clone(),
ProcessName::Auto(auto_name) => match auto_name {
AutoName::Using(settings) => settings.format_cmd(self.cmd.as_std()).into(),
AutoName::Debug => format!("{:?}", self.cmd).into(),
},
}
}
pub fn spawn_broadcast(self) -> Result<ProcessHandle<BroadcastOutputStream>, SpawnError> {
let name = self.generate_name();
ProcessHandle::<BroadcastOutputStream>::spawn_with_capacity(
name,
self.cmd,
self.stdout_chunk_size,
self.stderr_chunk_size,
self.stdout_capacity,
self.stderr_capacity,
)
}
pub fn spawn_single_subscriber(
self,
) -> Result<ProcessHandle<SingleSubscriberOutputStream>, SpawnError> {
let name = self.generate_name();
ProcessHandle::<SingleSubscriberOutputStream>::spawn_with_capacity(
name,
self.cmd,
SingleSubscriberStreamConfig {
chunk_size: self.stdout_chunk_size,
channel_capacity: self.stdout_capacity,
backpressure_control: self.stdout_backpressure_control,
},
SingleSubscriberStreamConfig {
chunk_size: self.stderr_chunk_size,
channel_capacity: self.stderr_capacity,
backpressure_control: self.stderr_backpressure_control,
},
)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
BackpressureControl, LineParsingOptions, NumBytes, NumBytesExt, Output, OutputStream,
};
use assertr::prelude::*;
use std::path::PathBuf;
use tokio::process::Command;
#[test]
#[should_panic(expected = "chunk_size must be greater than zero bytes")]
fn process_builder_panics_on_zero_chunk_size() {
let _process = Process::new(Command::new("ls")).chunk_sizes(NumBytes::zero());
}
#[tokio::test]
async fn process_builder_broadcast() {
let mut process = Process::new(Command::new("ls"))
.spawn_broadcast()
.expect("Failed to spawn");
assert_that!(process.stdout().chunk_size()).is_equal_to(DEFAULT_CHUNK_SIZE);
assert_that!(process.stderr().chunk_size()).is_equal_to(DEFAULT_CHUNK_SIZE);
assert_that!(process.stdout().channel_capacity()).is_equal_to(DEFAULT_CHANNEL_CAPACITY);
assert_that!(process.stderr().channel_capacity()).is_equal_to(DEFAULT_CHANNEL_CAPACITY);
let Output {
status,
stdout,
stderr,
} = process
.wait_for_completion_with_output(None, LineParsingOptions::default())
.await
.unwrap();
assert_that!(status.success()).is_true();
assert_that!(stdout).is_not_empty();
assert_that!(stderr).is_empty();
}
#[tokio::test]
async fn process_builder_broadcast_with_custom_capacities() {
let mut process = Process::new(Command::new("ls"))
.stdout_chunk_size(42.kilobytes())
.stderr_chunk_size(43.kilobytes())
.stdout_capacity(42)
.stderr_capacity(43)
.spawn_broadcast()
.expect("Failed to spawn");
assert_that!(process.stdout().chunk_size()).is_equal_to(42.kilobytes());
assert_that!(process.stderr().chunk_size()).is_equal_to(43.kilobytes());
assert_that!(process.stdout().channel_capacity()).is_equal_to(42);
assert_that!(process.stderr().channel_capacity()).is_equal_to(43);
let Output {
status,
stdout,
stderr,
} = process
.wait_for_completion_with_output(None, LineParsingOptions::default())
.await
.unwrap();
assert_that!(status.success()).is_true();
assert_that!(stdout).is_not_empty();
assert_that!(stderr).is_empty();
}
#[tokio::test]
async fn process_builder_single_subscriber_with_custom_backpressure_controls() {
let mut process = Process::new(Command::new("ls"))
.stdout_backpressure_control(BackpressureControl::BlockUntilBufferHasSpace)
.stderr_backpressure_control(BackpressureControl::DropLatestIncomingIfBufferFull)
.spawn_single_subscriber()
.expect("Failed to spawn");
assert_that!(process.stdout().backpressure_control())
.is_equal_to(BackpressureControl::BlockUntilBufferHasSpace);
assert_that!(process.stderr().backpressure_control())
.is_equal_to(BackpressureControl::DropLatestIncomingIfBufferFull);
let _ = process.wait_for_completion(None).await.unwrap();
}
#[tokio::test]
async fn process_builder_single_subscriber_with_shared_backpressure_control() {
let mut process = Process::new(Command::new("ls"))
.backpressure_control(BackpressureControl::BlockUntilBufferHasSpace)
.spawn_single_subscriber()
.expect("Failed to spawn");
assert_that!(process.stdout().backpressure_control())
.is_equal_to(BackpressureControl::BlockUntilBufferHasSpace);
assert_that!(process.stderr().backpressure_control())
.is_equal_to(BackpressureControl::BlockUntilBufferHasSpace);
let _ = process.wait_for_completion(None).await.unwrap();
}
#[tokio::test]
async fn process_builder_single_subscriber() {
let mut process = Process::new(Command::new("ls"))
.spawn_single_subscriber()
.expect("Failed to spawn");
assert_that!(process.stdout().chunk_size()).is_equal_to(DEFAULT_CHUNK_SIZE);
assert_that!(process.stderr().chunk_size()).is_equal_to(DEFAULT_CHUNK_SIZE);
assert_that!(process.stdout().channel_capacity()).is_equal_to(DEFAULT_CHANNEL_CAPACITY);
assert_that!(process.stderr().channel_capacity()).is_equal_to(DEFAULT_CHANNEL_CAPACITY);
assert_that!(process.stdout().backpressure_control())
.is_equal_to(BackpressureControl::DropLatestIncomingIfBufferFull);
assert_that!(process.stderr().backpressure_control())
.is_equal_to(BackpressureControl::DropLatestIncomingIfBufferFull);
let Output {
status,
stdout,
stderr,
} = process
.wait_for_completion_with_output(None, LineParsingOptions::default())
.await
.unwrap();
assert_that!(status.success()).is_true();
assert_that!(stdout).is_not_empty();
assert_that!(stderr).is_empty();
}
#[tokio::test]
async fn process_builder_single_subscriber_with_custom_capacities() {
let mut process = Process::new(Command::new("ls"))
.stdout_chunk_size(42.kilobytes())
.stderr_chunk_size(43.kilobytes())
.stdout_capacity(42)
.stderr_capacity(43)
.spawn_single_subscriber()
.expect("Failed to spawn");
assert_that!(process.stdout().chunk_size()).is_equal_to(42.kilobytes());
assert_that!(process.stderr().chunk_size()).is_equal_to(43.kilobytes());
assert_that!(process.stdout().channel_capacity()).is_equal_to(42);
assert_that!(process.stderr().channel_capacity()).is_equal_to(43);
assert_that!(process.stdout().backpressure_control())
.is_equal_to(BackpressureControl::DropLatestIncomingIfBufferFull);
assert_that!(process.stderr().backpressure_control())
.is_equal_to(BackpressureControl::DropLatestIncomingIfBufferFull);
let Output {
status,
stdout,
stderr,
} = process
.wait_for_completion_with_output(None, LineParsingOptions::default())
.await
.unwrap();
assert_that!(status.success()).is_true();
assert_that!(stdout).is_not_empty();
assert_that!(stderr).is_empty();
}
#[tokio::test]
async fn process_builder_auto_name_captures_command_with_args_if_not_otherwise_specified() {
let mut cmd = Command::new("ls");
cmd.arg("-la");
cmd.env("FOO", "foo");
cmd.current_dir(PathBuf::from("./"));
let mut process = Process::new(cmd)
.spawn_broadcast()
.expect("Failed to spawn");
assert_that!(&process.name).is_equal_to("ls \"-la\"");
let _ = process.wait_for_completion(None).await;
}
#[tokio::test]
async fn process_builder_auto_name_only_captures_command_when_requested() {
let mut cmd = Command::new("ls");
cmd.arg("-la");
cmd.env("FOO", "foo");
cmd.current_dir(PathBuf::from("./"));
let mut process = Process::new(cmd)
.with_auto_name(AutoName::Using(AutoNameSettings::program_only()))
.spawn_broadcast()
.expect("Failed to spawn");
assert_that!(&process.name).is_equal_to("ls");
let _ = process.wait_for_completion(None).await;
}
#[tokio::test]
async fn process_builder_auto_name_captures_command_with_envs_and_args_when_requested() {
let mut cmd = Command::new("ls");
cmd.arg("-la");
cmd.env("FOO", "foo");
cmd.current_dir(PathBuf::from("./"));
let mut process = Process::new(cmd)
.with_auto_name(AutoName::Using(
AutoNameSettings::program_with_env_and_args(),
))
.spawn_broadcast()
.expect("Failed to spawn");
assert_that!(&process.name).is_equal_to("FOO=foo ls \"-la\"");
let _ = process.wait_for_completion(None).await;
}
#[tokio::test]
async fn process_builder_auto_name_captures_command_with_current_dir_envs_and_args_when_requested()
{
let mut cmd = Command::new("ls");
cmd.arg("-la");
cmd.env("FOO", "foo");
cmd.current_dir(PathBuf::from("./"));
let mut process = Process::new(cmd)
.with_auto_name(AutoName::Using(AutoNameSettings::full()))
.spawn_broadcast()
.expect("Failed to spawn");
assert_that!(&process.name).is_equal_to("./ % FOO=foo ls \"-la\"");
let _ = process.wait_for_completion(None).await;
}
#[tokio::test]
async fn process_builder_auto_name_captures_full_command_debug_string_when_requested() {
let mut cmd = Command::new("ls");
cmd.arg("-la");
cmd.env("FOO", "foo");
cmd.current_dir(PathBuf::from("./"));
let mut process = Process::new(cmd)
.with_auto_name(AutoName::Debug)
.spawn_broadcast()
.expect("Failed to spawn");
assert_that!(&process.name).is_equal_to(
"Command { std: cd \"./\" && FOO=\"foo\" \"ls\" \"-la\", kill_on_drop: false }",
);
let _ = process.wait_for_completion(None).await;
}
#[tokio::test]
async fn process_builder_custom_name() {
let id = 42;
let mut process = Process::new(Command::new("ls"))
.with_name(format!("worker-{id}"))
.spawn_broadcast()
.expect("Failed to spawn");
assert_that!(&process.name).is_equal_to("worker-42");
let _ = process.wait_for_completion(None).await;
}
}