Skip to main content

tokio_process_tools/
process.rs

1//! Builder API for spawning processes with explicit stream type selection.
2
3use crate::error::SpawnError;
4use crate::output_stream::broadcast::BroadcastOutputStream;
5use crate::output_stream::single_subscriber::SingleSubscriberOutputStream;
6use crate::output_stream::{BackpressureControl, DEFAULT_CHANNEL_CAPACITY, DEFAULT_CHUNK_SIZE};
7use crate::process_handle::SingleSubscriberStreamConfig;
8use crate::{NumBytes, ProcessHandle};
9use std::borrow::Cow;
10
11/// Controls how the process name is automatically generated when not explicitly provided.
12///
13/// This determines what information is included in the auto-generated process name
14/// used for logging and debugging purposes.
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum AutoName {
17    /// Capture a name from the command as specified by the provided settings.
18    ///
19    /// Example: `"ls -la"` from `Command::new("ls").arg("-la")`
20    Using(AutoNameSettings),
21
22    /// Capture the full Debug representation of the Command.
23    ///
24    /// Example: `"Command { std: \"ls\" \"-la\", kill_on_drop: false }"`
25    ///
26    /// Note: This includes internal implementation details and may change with tokio updates.
27    Debug,
28}
29
30impl Default for AutoName {
31    fn default() -> Self {
32        Self::Using(AutoNameSettings::program_with_args())
33    }
34}
35
36/// Controls in detail which parts of the command are automatically captured as the process name.
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38#[expect(
39    clippy::struct_excessive_bools,
40    reason = "each flag controls one optional part of the generated process name"
41)]
42pub struct AutoNameSettings {
43    include_current_dir: bool,
44    include_envs: bool,
45    include_program: bool,
46    include_args: bool,
47}
48
49impl AutoNameSettings {
50    /// Capture the program name.
51    ///
52    /// Example: `Command::new("ls").arg("-la").env("FOO", "foo)` is captured as `"ls"`.
53    #[must_use]
54    pub fn program_only() -> Self {
55        AutoNameSettings {
56            include_current_dir: false,
57            include_envs: false,
58            include_program: true,
59            include_args: false,
60        }
61    }
62
63    /// Capture the program name and all arguments.
64    ///
65    /// Example: `Command::new("ls").arg("-la").env("FOO", "foo)` is captured as `"ls -la"`.
66    #[must_use]
67    pub fn program_with_args() -> Self {
68        AutoNameSettings {
69            include_current_dir: false,
70            include_envs: false,
71            include_program: true,
72            include_args: true,
73        }
74    }
75
76    /// Capture the program name and all environment variables and arguments.
77    ///
78    /// Example: `Command::new("ls").arg("-la").env("FOO", "foo)` is captured as `"FOO=foo ls -la"`.
79    #[must_use]
80    pub fn program_with_env_and_args() -> Self {
81        AutoNameSettings {
82            include_current_dir: false,
83            include_envs: true,
84            include_program: true,
85            include_args: true,
86        }
87    }
88
89    /// Capture the directory and the program name and all environment variables and arguments.
90    ///
91    /// Example: `Command::new("ls").arg("-la").env("FOO", "foo)` is captured as `"/some/dir % FOO=foo ls -la"`.
92    #[must_use]
93    pub fn full() -> Self {
94        AutoNameSettings {
95            include_current_dir: true,
96            include_envs: true,
97            include_program: true,
98            include_args: true,
99        }
100    }
101
102    fn format_cmd(self, cmd: &std::process::Command) -> String {
103        let mut name = String::new();
104        if self.include_current_dir
105            && let Some(current_dir) = cmd.get_current_dir()
106        {
107            name.push_str(current_dir.to_string_lossy().as_ref());
108            name.push_str(" % ");
109        }
110        if self.include_envs {
111            let envs = cmd.get_envs();
112            if envs.len() != 0 {
113                for (key, value) in envs
114                    .filter(|(_key, value)| value.is_some())
115                    .map(|(key, value)| (key, value.expect("present")))
116                {
117                    name.push_str(key.to_string_lossy().as_ref());
118                    name.push('=');
119                    name.push_str(value.to_string_lossy().as_ref());
120                    name.push(' ');
121                }
122            }
123        }
124        if self.include_program {
125            name.push_str(cmd.get_program().to_string_lossy().as_ref());
126            name.push(' ');
127        }
128        if self.include_args {
129            let args = cmd.get_args();
130            if args.len() != 0 {
131                for arg in args {
132                    name.push('"');
133                    name.push_str(arg.to_string_lossy().as_ref());
134                    name.push('"');
135                    name.push(' ');
136                }
137            }
138        }
139        if name.ends_with(' ') {
140            name.pop();
141        }
142        name
143    }
144}
145
146/// Specifies how a process should be named.
147///
148/// This enum allows you to either provide an explicit name or configure automatic
149/// name generation. Using this type ensures you cannot accidentally set both an
150/// explicit name and an auto-naming mode at the same time.
151#[derive(Debug, Clone)]
152pub enum ProcessName {
153    /// Use an explicit custom name.
154    ///
155    /// Example: `ProcessName::Explicit("my-server")`
156    Explicit(Cow<'static, str>),
157
158    /// Auto-generate the name based on the command.
159    ///
160    /// Example: `ProcessName::Auto(AutoName::ProgramWithArgs)`
161    Auto(AutoName),
162}
163
164impl Default for ProcessName {
165    fn default() -> Self {
166        Self::Auto(AutoName::default())
167    }
168}
169
170impl From<&'static str> for ProcessName {
171    fn from(s: &'static str) -> Self {
172        Self::Explicit(Cow::Borrowed(s))
173    }
174}
175
176impl From<String> for ProcessName {
177    fn from(s: String) -> Self {
178        Self::Explicit(Cow::Owned(s))
179    }
180}
181
182impl From<Cow<'static, str>> for ProcessName {
183    fn from(s: Cow<'static, str>) -> Self {
184        Self::Explicit(s)
185    }
186}
187
188impl From<AutoName> for ProcessName {
189    fn from(mode: AutoName) -> Self {
190        Self::Auto(mode)
191    }
192}
193
194/// A builder for configuring and spawning a process.
195///
196/// This provides an ergonomic API for spawning processes while keeping the stream type
197/// (broadcast vs single subscriber) explicit at the spawn callsite.
198///
199/// # Examples
200///
201/// ```no_run
202/// use tokio_process_tools::*;
203/// use tokio::process::Command;
204///
205/// # tokio_test::block_on(async {
206/// // Simple case with auto-derived name
207/// let process = Process::new(Command::new("ls"))
208///     .spawn_broadcast()?;
209///
210/// // With explicit name (no allocation when using string literal)
211/// let process = Process::new(Command::new("server"))
212///     .name("my-server")
213///     .spawn_single_subscriber()?;
214///
215/// // With custom capacities
216/// let process = Process::new(Command::new("cargo"))
217///     .name("test-runner")
218///     .stdout_capacity(512)
219///     .stderr_capacity(512)
220///     .spawn_broadcast()?;
221/// # Ok::<_, SpawnError>(())
222/// # });
223/// ```
224pub struct Process {
225    cmd: tokio::process::Command,
226    name: ProcessName,
227    stdout_chunk_size: NumBytes,
228    stderr_chunk_size: NumBytes,
229    stdout_capacity: usize,
230    stderr_capacity: usize,
231    stdout_backpressure_control: BackpressureControl,
232    stderr_backpressure_control: BackpressureControl,
233}
234
235impl Process {
236    /// Creates a new process builder from a tokio command.
237    ///
238    /// If no name is explicitly set via [`Process::name`], the name will be auto-derived
239    /// from the command's program name.
240    ///
241    /// # Examples
242    ///
243    /// ```no_run
244    /// use tokio_process_tools::*;
245    /// use tokio::process::Command;
246    ///
247    /// # tokio_test::block_on(async {
248    /// let process = Process::new(Command::new("ls"))
249    ///     .spawn_broadcast()?;
250    /// # Ok::<_, SpawnError>(())
251    /// # });
252    /// ```
253    #[must_use]
254    pub fn new(cmd: tokio::process::Command) -> Self {
255        Self {
256            cmd,
257            name: ProcessName::default(),
258            stdout_chunk_size: DEFAULT_CHUNK_SIZE,
259            stderr_chunk_size: DEFAULT_CHUNK_SIZE,
260            stdout_capacity: DEFAULT_CHANNEL_CAPACITY,
261            stderr_capacity: DEFAULT_CHANNEL_CAPACITY,
262            stdout_backpressure_control: BackpressureControl::DropLatestIncomingIfBufferFull,
263            stderr_backpressure_control: BackpressureControl::DropLatestIncomingIfBufferFull,
264        }
265    }
266
267    /// Sets how the process should be named.
268    ///
269    /// You can provide either an explicit name or configure automatic name generation.
270    /// The name is used for logging and debugging purposes.
271    ///
272    /// # Examples
273    ///
274    /// ```no_run
275    /// use tokio_process_tools::*;
276    /// use tokio::process::Command;
277    ///
278    /// # tokio_test::block_on(async {
279    /// // Explicit name
280    /// let process = Process::new(Command::new("server"))
281    ///     .name(ProcessName::Explicit("my-server".into()))
282    ///     .spawn_broadcast()?;
283    ///
284    /// // Auto-generated with arguments
285    /// let mut cmd = Command::new("cargo");
286    /// cmd.arg("test");
287    /// let process = Process::new(cmd)
288    ///     .name(ProcessName::Auto(AutoName::Using(AutoNameSettings::program_with_args())))
289    ///     .spawn_broadcast()?;
290    /// # Ok::<_, SpawnError>(())
291    /// # });
292    /// ```
293    #[must_use]
294    pub fn name(mut self, name: impl Into<ProcessName>) -> Self {
295        self.name = name.into();
296        self
297    }
298
299    /// Convenience method to set an explicit process name.
300    ///
301    /// This is a shorthand for `.name(ProcessName::Explicit(...))`.
302    ///
303    /// # Examples
304    ///
305    /// ```no_run
306    /// use tokio_process_tools::*;
307    /// use tokio::process::Command;
308    ///
309    /// # tokio_test::block_on(async {
310    /// // Static name (no allocation)
311    /// let process = Process::new(Command::new("server"))
312    ///     .with_name("my-server")
313    ///     .spawn_broadcast()?;
314    ///
315    /// // Dynamic name (allocates)
316    /// let id = 42;
317    /// let process = Process::new(Command::new("worker"))
318    ///     .with_name(format!("worker-{id}"))
319    ///     .spawn_single_subscriber()?;
320    /// # Ok::<_, SpawnError>(())
321    /// # });
322    /// ```
323    #[must_use]
324    pub fn with_name(self, name: impl Into<Cow<'static, str>>) -> Self {
325        self.name(ProcessName::Explicit(name.into()))
326    }
327
328    /// Convenience method to configure automatic name generation.
329    ///
330    /// This is a shorthand for `.name(ProcessName::Auto(...))`.
331    ///
332    /// # Examples
333    ///
334    /// ```no_run
335    /// use tokio_process_tools::*;
336    /// use tokio::process::Command;
337    ///
338    /// # tokio_test::block_on(async {
339    /// let mut cmd = Command::new("server");
340    /// cmd.arg("--database").arg("sqlite");
341    /// cmd.env("S3_ENDPOINT", "127.0.0.1:9000");
342    ///
343    /// let process = Process::new(cmd)
344    ///     .with_auto_name(AutoName::Using(AutoNameSettings::program_with_env_and_args()))
345    ///     .spawn_broadcast()?;
346    /// # Ok::<_, SpawnError>(())
347    /// # });
348    /// ```
349    #[must_use]
350    pub fn with_auto_name(self, mode: AutoName) -> Self {
351        self.name(ProcessName::Auto(mode))
352    }
353
354    /// Sets the stdout chunk size.
355    ///
356    /// This controls the size of the buffer used when reading from the process's stdout stream.
357    /// Default is [`DEFAULT_CHUNK_SIZE`].
358    ///
359    /// # Panics
360    ///
361    /// Panics if `chunk_size` is zero.
362    ///
363    /// # Examples
364    ///
365    /// ```no_run
366    /// use tokio_process_tools::*;
367    /// use tokio::process::Command;
368    ///
369    /// # tokio_test::block_on(async {
370    /// let process = Process::new(Command::new("server"))
371    ///     .stdout_chunk_size(32.kilobytes())
372    ///     .spawn_broadcast()?;
373    /// # Ok::<_, SpawnError>(())
374    /// # });
375    /// ```
376    #[must_use]
377    pub fn stdout_chunk_size(mut self, chunk_size: NumBytes) -> Self {
378        chunk_size.assert_non_zero("chunk_size");
379        self.stdout_chunk_size = chunk_size;
380        self
381    }
382
383    /// Sets the stderr chunk size.
384    ///
385    /// This controls the size of the buffer used when reading from the process's stderr stream.
386    /// Default is [`DEFAULT_CHUNK_SIZE`].
387    ///
388    /// # Panics
389    ///
390    /// Panics if `chunk_size` is zero.
391    ///
392    /// # Examples
393    ///
394    /// ```no_run
395    /// use tokio_process_tools::*;
396    /// use tokio::process::Command;
397    ///
398    /// # tokio_test::block_on(async {
399    /// let process = Process::new(Command::new("server"))
400    ///     .stderr_chunk_size(32.kilobytes())
401    ///     .spawn_broadcast()?;
402    /// # Ok::<_, SpawnError>(())
403    /// # });
404    /// ```
405    #[must_use]
406    pub fn stderr_chunk_size(mut self, chunk_size: NumBytes) -> Self {
407        chunk_size.assert_non_zero("chunk_size");
408        self.stderr_chunk_size = chunk_size;
409        self
410    }
411
412    /// Sets the stdout and stderr chunk sizes.
413    ///
414    /// This controls the size of the buffers used when reading from the process's stdout and
415    /// stderr streams.
416    /// Default is [`DEFAULT_CHUNK_SIZE`].
417    ///
418    /// # Panics
419    ///
420    /// Panics if `chunk_size` is zero.
421    ///
422    /// # Examples
423    ///
424    /// ```no_run
425    /// use tokio_process_tools::*;
426    /// use tokio::process::Command;
427    ///
428    /// # tokio_test::block_on(async {
429    /// let process = Process::new(Command::new("server"))
430    ///     .chunk_sizes(32.kilobytes())
431    ///     .spawn_broadcast()?;
432    /// # Ok::<_, SpawnError>(())
433    /// # });
434    /// ```
435    #[must_use]
436    pub fn chunk_sizes(mut self, chunk_size: NumBytes) -> Self {
437        chunk_size.assert_non_zero("chunk_size");
438        self.stdout_chunk_size = chunk_size;
439        self.stderr_chunk_size = chunk_size;
440        self
441    }
442
443    /// Sets the stdout channel capacity.
444    ///
445    /// This controls how many chunks can be buffered before backpressure is applied.
446    /// Default is [`DEFAULT_CHANNEL_CAPACITY`].
447    ///
448    /// # Examples
449    ///
450    /// ```no_run
451    /// use tokio_process_tools::*;
452    /// use tokio::process::Command;
453    ///
454    /// # tokio_test::block_on(async {
455    /// let process = Process::new(Command::new("server"))
456    ///     .stdout_capacity(512)
457    ///     .spawn_broadcast()?;
458    /// # Ok::<_, SpawnError>(())
459    /// # });
460    /// ```
461    #[must_use]
462    pub fn stdout_capacity(mut self, capacity: usize) -> Self {
463        self.stdout_capacity = capacity;
464        self
465    }
466
467    /// Sets the stderr channel capacity.
468    ///
469    /// This controls how many chunks can be buffered before backpressure is applied.
470    /// Default is [`DEFAULT_CHANNEL_CAPACITY`].
471    ///
472    /// # Examples
473    ///
474    /// ```no_run
475    /// use tokio_process_tools::*;
476    /// use tokio::process::Command;
477    ///
478    /// # tokio_test::block_on(async {
479    /// let process = Process::new(Command::new("server"))
480    ///     .stderr_capacity(256)
481    ///     .spawn_broadcast()?;
482    /// # Ok::<_, SpawnError>(())
483    /// # });
484    /// ```
485    #[must_use]
486    pub fn stderr_capacity(mut self, capacity: usize) -> Self {
487        self.stderr_capacity = capacity;
488        self
489    }
490
491    /// Sets the stdout and stderr channel capacity.
492    ///
493    /// This controls how many chunks can be buffered before backpressure is applied.
494    /// Default is [`DEFAULT_CHANNEL_CAPACITY`].
495    ///
496    /// # Examples
497    ///
498    /// ```no_run
499    /// use tokio_process_tools::*;
500    /// use tokio::process::Command;
501    ///
502    /// # tokio_test::block_on(async {
503    /// let process = Process::new(Command::new("server"))
504    ///     .capacities(256)
505    ///     .spawn_broadcast()?;
506    /// # Ok::<_, SpawnError>(())
507    /// # });
508    /// ```
509    #[must_use]
510    pub fn capacities(mut self, capacity: usize) -> Self {
511        self.stdout_capacity = capacity;
512        self.stderr_capacity = capacity;
513        self
514    }
515
516    /// Sets the stdout backpressure policy used by `.spawn_single_subscriber()`.
517    ///
518    /// The default is [`BackpressureControl::DropLatestIncomingIfBufferFull`], which prioritizes
519    /// keeping the child process unblocked over delivering every chunk to the consumer. Use
520    /// [`BackpressureControl::BlockUntilBufferHasSpace`] when you prefer reliable observation over
521    /// throughput, for example when waiting for a startup line in tests.
522    ///
523    /// This setting is ignored by `.spawn_broadcast()`.
524    #[must_use]
525    pub fn stdout_backpressure_control(
526        mut self,
527        backpressure_control: BackpressureControl,
528    ) -> Self {
529        self.stdout_backpressure_control = backpressure_control;
530        self
531    }
532
533    /// Sets the stderr backpressure policy used by `.spawn_single_subscriber()`.
534    ///
535    /// The default is [`BackpressureControl::DropLatestIncomingIfBufferFull`].
536    /// This setting is ignored by `.spawn_broadcast()`.
537    #[must_use]
538    pub fn stderr_backpressure_control(
539        mut self,
540        backpressure_control: BackpressureControl,
541    ) -> Self {
542        self.stderr_backpressure_control = backpressure_control;
543        self
544    }
545
546    /// Sets the stdout and stderr backpressure policy used by `.spawn_single_subscriber()`.
547    ///
548    /// This is a shorthand for configuring both streams with the same
549    /// [`BackpressureControl`]. The default is
550    /// [`BackpressureControl::DropLatestIncomingIfBufferFull`].
551    ///
552    /// This setting is ignored by `.spawn_broadcast()`.
553    #[must_use]
554    pub fn backpressure_control(mut self, backpressure_control: BackpressureControl) -> Self {
555        self.stdout_backpressure_control = backpressure_control;
556        self.stderr_backpressure_control = backpressure_control;
557        self
558    }
559
560    /// Generates a process name based on the configured naming strategy.
561    fn generate_name(&self) -> Cow<'static, str> {
562        match &self.name {
563            ProcessName::Explicit(name) => name.clone(),
564            ProcessName::Auto(auto_name) => match auto_name {
565                AutoName::Using(settings) => settings.format_cmd(self.cmd.as_std()).into(),
566                AutoName::Debug => format!("{:?}", self.cmd).into(),
567            },
568        }
569    }
570
571    /// Spawns the process with broadcast output streams.
572    ///
573    /// Broadcast streams support multiple concurrent consumers of stdout/stderr,
574    /// which is useful when you need to inspect, collect, and process output
575    /// simultaneously. This comes with slightly higher memory overhead due to cloning.
576    ///
577    /// Broadcast streams are lossy under pressure: if a receiver falls behind the bounded
578    /// broadcast buffer, it may miss older chunks. This backend does not support blocking
579    /// backpressure. If you need reliable delivery with backpressure, use
580    /// [`Process::spawn_single_subscriber`] together with
581    /// [`BackpressureControl::BlockUntilBufferHasSpace`].
582    ///
583    /// # Examples
584    ///
585    /// ```no_run
586    /// use tokio_process_tools::*;
587    /// use tokio::process::Command;
588    ///
589    /// # tokio_test::block_on(async {
590    /// let mut process = Process::new(Command::new("ls"))
591    ///     .spawn_broadcast()?;
592    ///
593    /// // Multiple consumers can read the same output
594    /// let _logger = process.stdout().inspect_lines(|line| {
595    ///     println!("{}", line);
596    ///     tokio_process_tools::Next::Continue
597    /// }, Default::default());
598    ///
599    /// let _collector = process.stdout().collect_lines_into_vec(Default::default());
600    /// # Ok::<_, SpawnError>(())
601    /// # });
602    /// ```
603    ///
604    /// # Errors
605    ///
606    /// Returns [`SpawnError::SpawnFailed`] if the process cannot be spawned.
607    pub fn spawn_broadcast(self) -> Result<ProcessHandle<BroadcastOutputStream>, SpawnError> {
608        let name = self.generate_name();
609        ProcessHandle::<BroadcastOutputStream>::spawn_with_capacity(
610            name,
611            self.cmd,
612            self.stdout_chunk_size,
613            self.stderr_chunk_size,
614            self.stdout_capacity,
615            self.stderr_capacity,
616        )
617    }
618
619    /// Spawns the process with single subscriber output streams.
620    ///
621    /// Single subscriber streams are more efficient (lower memory, no cloning) but
622    /// only allow one consumer of stdout/stderr at a time. Use this when you only
623    /// need to either inspect OR collect output, not both simultaneously.
624    ///
625    /// # Examples
626    ///
627    /// ```no_run
628    /// use tokio_process_tools::*;
629    /// use tokio::process::Command;
630    ///
631    /// # tokio_test::block_on(async {
632    /// let process = Process::new(Command::new("ls"))
633    ///     .spawn_single_subscriber()?;
634    ///
635    /// // Only one consumer allowed
636    /// let collector = process.stdout().collect_lines_into_vec(Default::default());
637    /// # Ok::<_, SpawnError>(())
638    /// # });
639    /// ```
640    ///
641    /// # Errors
642    ///
643    /// Returns [`SpawnError::SpawnFailed`] if the process cannot be spawned.
644    pub fn spawn_single_subscriber(
645        self,
646    ) -> Result<ProcessHandle<SingleSubscriberOutputStream>, SpawnError> {
647        let name = self.generate_name();
648        ProcessHandle::<SingleSubscriberOutputStream>::spawn_with_capacity(
649            name,
650            self.cmd,
651            SingleSubscriberStreamConfig {
652                chunk_size: self.stdout_chunk_size,
653                channel_capacity: self.stdout_capacity,
654                backpressure_control: self.stdout_backpressure_control,
655            },
656            SingleSubscriberStreamConfig {
657                chunk_size: self.stderr_chunk_size,
658                channel_capacity: self.stderr_capacity,
659                backpressure_control: self.stderr_backpressure_control,
660            },
661        )
662    }
663}
664
665#[cfg(test)]
666mod tests {
667    use super::*;
668    use crate::{
669        BackpressureControl, LineParsingOptions, NumBytes, NumBytesExt, Output, OutputStream,
670    };
671    use assertr::prelude::*;
672    use std::path::PathBuf;
673    use tokio::process::Command;
674
675    #[test]
676    #[should_panic(expected = "chunk_size must be greater than zero bytes")]
677    fn process_builder_panics_on_zero_chunk_size() {
678        let _process = Process::new(Command::new("ls")).chunk_sizes(NumBytes::zero());
679    }
680
681    #[tokio::test]
682    async fn process_builder_broadcast() {
683        let mut process = Process::new(Command::new("ls"))
684            .spawn_broadcast()
685            .expect("Failed to spawn");
686
687        assert_that!(process.stdout().chunk_size()).is_equal_to(DEFAULT_CHUNK_SIZE);
688        assert_that!(process.stderr().chunk_size()).is_equal_to(DEFAULT_CHUNK_SIZE);
689        assert_that!(process.stdout().channel_capacity()).is_equal_to(DEFAULT_CHANNEL_CAPACITY);
690        assert_that!(process.stderr().channel_capacity()).is_equal_to(DEFAULT_CHANNEL_CAPACITY);
691
692        let Output {
693            status,
694            stdout,
695            stderr,
696        } = process
697            .wait_for_completion_with_output(None, LineParsingOptions::default())
698            .await
699            .unwrap();
700
701        assert_that!(status.success()).is_true();
702        assert_that!(stdout).is_not_empty();
703        assert_that!(stderr).is_empty();
704    }
705
706    #[tokio::test]
707    async fn process_builder_broadcast_with_custom_capacities() {
708        let mut process = Process::new(Command::new("ls"))
709            .stdout_chunk_size(42.kilobytes())
710            .stderr_chunk_size(43.kilobytes())
711            .stdout_capacity(42)
712            .stderr_capacity(43)
713            .spawn_broadcast()
714            .expect("Failed to spawn");
715
716        assert_that!(process.stdout().chunk_size()).is_equal_to(42.kilobytes());
717        assert_that!(process.stderr().chunk_size()).is_equal_to(43.kilobytes());
718        assert_that!(process.stdout().channel_capacity()).is_equal_to(42);
719        assert_that!(process.stderr().channel_capacity()).is_equal_to(43);
720
721        let Output {
722            status,
723            stdout,
724            stderr,
725        } = process
726            .wait_for_completion_with_output(None, LineParsingOptions::default())
727            .await
728            .unwrap();
729
730        assert_that!(status.success()).is_true();
731        assert_that!(stdout).is_not_empty();
732        assert_that!(stderr).is_empty();
733    }
734
735    #[tokio::test]
736    async fn process_builder_single_subscriber_with_custom_backpressure_controls() {
737        let mut process = Process::new(Command::new("ls"))
738            .stdout_backpressure_control(BackpressureControl::BlockUntilBufferHasSpace)
739            .stderr_backpressure_control(BackpressureControl::DropLatestIncomingIfBufferFull)
740            .spawn_single_subscriber()
741            .expect("Failed to spawn");
742
743        assert_that!(process.stdout().backpressure_control())
744            .is_equal_to(BackpressureControl::BlockUntilBufferHasSpace);
745        assert_that!(process.stderr().backpressure_control())
746            .is_equal_to(BackpressureControl::DropLatestIncomingIfBufferFull);
747
748        let _ = process.wait_for_completion(None).await.unwrap();
749    }
750
751    #[tokio::test]
752    async fn process_builder_single_subscriber_with_shared_backpressure_control() {
753        let mut process = Process::new(Command::new("ls"))
754            .backpressure_control(BackpressureControl::BlockUntilBufferHasSpace)
755            .spawn_single_subscriber()
756            .expect("Failed to spawn");
757
758        assert_that!(process.stdout().backpressure_control())
759            .is_equal_to(BackpressureControl::BlockUntilBufferHasSpace);
760        assert_that!(process.stderr().backpressure_control())
761            .is_equal_to(BackpressureControl::BlockUntilBufferHasSpace);
762
763        let _ = process.wait_for_completion(None).await.unwrap();
764    }
765
766    #[tokio::test]
767    async fn process_builder_single_subscriber() {
768        let mut process = Process::new(Command::new("ls"))
769            .spawn_single_subscriber()
770            .expect("Failed to spawn");
771
772        assert_that!(process.stdout().chunk_size()).is_equal_to(DEFAULT_CHUNK_SIZE);
773        assert_that!(process.stderr().chunk_size()).is_equal_to(DEFAULT_CHUNK_SIZE);
774        assert_that!(process.stdout().channel_capacity()).is_equal_to(DEFAULT_CHANNEL_CAPACITY);
775        assert_that!(process.stderr().channel_capacity()).is_equal_to(DEFAULT_CHANNEL_CAPACITY);
776        assert_that!(process.stdout().backpressure_control())
777            .is_equal_to(BackpressureControl::DropLatestIncomingIfBufferFull);
778        assert_that!(process.stderr().backpressure_control())
779            .is_equal_to(BackpressureControl::DropLatestIncomingIfBufferFull);
780
781        let Output {
782            status,
783            stdout,
784            stderr,
785        } = process
786            .wait_for_completion_with_output(None, LineParsingOptions::default())
787            .await
788            .unwrap();
789
790        assert_that!(status.success()).is_true();
791        assert_that!(stdout).is_not_empty();
792        assert_that!(stderr).is_empty();
793    }
794
795    #[tokio::test]
796    async fn process_builder_single_subscriber_with_custom_capacities() {
797        let mut process = Process::new(Command::new("ls"))
798            .stdout_chunk_size(42.kilobytes())
799            .stderr_chunk_size(43.kilobytes())
800            .stdout_capacity(42)
801            .stderr_capacity(43)
802            .spawn_single_subscriber()
803            .expect("Failed to spawn");
804
805        assert_that!(process.stdout().chunk_size()).is_equal_to(42.kilobytes());
806        assert_that!(process.stderr().chunk_size()).is_equal_to(43.kilobytes());
807        assert_that!(process.stdout().channel_capacity()).is_equal_to(42);
808        assert_that!(process.stderr().channel_capacity()).is_equal_to(43);
809        assert_that!(process.stdout().backpressure_control())
810            .is_equal_to(BackpressureControl::DropLatestIncomingIfBufferFull);
811        assert_that!(process.stderr().backpressure_control())
812            .is_equal_to(BackpressureControl::DropLatestIncomingIfBufferFull);
813
814        let Output {
815            status,
816            stdout,
817            stderr,
818        } = process
819            .wait_for_completion_with_output(None, LineParsingOptions::default())
820            .await
821            .unwrap();
822
823        assert_that!(status.success()).is_true();
824        assert_that!(stdout).is_not_empty();
825        assert_that!(stderr).is_empty();
826    }
827
828    #[tokio::test]
829    async fn process_builder_auto_name_captures_command_with_args_if_not_otherwise_specified() {
830        let mut cmd = Command::new("ls");
831        cmd.arg("-la");
832        cmd.env("FOO", "foo");
833        cmd.current_dir(PathBuf::from("./"));
834
835        let mut process = Process::new(cmd)
836            .spawn_broadcast()
837            .expect("Failed to spawn");
838
839        assert_that!(&process.name).is_equal_to("ls \"-la\"");
840
841        let _ = process.wait_for_completion(None).await;
842    }
843
844    #[tokio::test]
845    async fn process_builder_auto_name_only_captures_command_when_requested() {
846        let mut cmd = Command::new("ls");
847        cmd.arg("-la");
848        cmd.env("FOO", "foo");
849        cmd.current_dir(PathBuf::from("./"));
850
851        let mut process = Process::new(cmd)
852            .with_auto_name(AutoName::Using(AutoNameSettings::program_only()))
853            .spawn_broadcast()
854            .expect("Failed to spawn");
855
856        assert_that!(&process.name).is_equal_to("ls");
857
858        let _ = process.wait_for_completion(None).await;
859    }
860
861    #[tokio::test]
862    async fn process_builder_auto_name_captures_command_with_envs_and_args_when_requested() {
863        let mut cmd = Command::new("ls");
864        cmd.arg("-la");
865        cmd.env("FOO", "foo");
866        cmd.current_dir(PathBuf::from("./"));
867
868        let mut process = Process::new(cmd)
869            .with_auto_name(AutoName::Using(
870                AutoNameSettings::program_with_env_and_args(),
871            ))
872            .spawn_broadcast()
873            .expect("Failed to spawn");
874
875        assert_that!(&process.name).is_equal_to("FOO=foo ls \"-la\"");
876
877        let _ = process.wait_for_completion(None).await;
878    }
879
880    #[tokio::test]
881    async fn process_builder_auto_name_captures_command_with_current_dir_envs_and_args_when_requested()
882     {
883        let mut cmd = Command::new("ls");
884        cmd.arg("-la");
885        cmd.env("FOO", "foo");
886        cmd.current_dir(PathBuf::from("./"));
887
888        let mut process = Process::new(cmd)
889            .with_auto_name(AutoName::Using(AutoNameSettings::full()))
890            .spawn_broadcast()
891            .expect("Failed to spawn");
892
893        assert_that!(&process.name).is_equal_to("./ % FOO=foo ls \"-la\"");
894
895        let _ = process.wait_for_completion(None).await;
896    }
897
898    #[tokio::test]
899    async fn process_builder_auto_name_captures_full_command_debug_string_when_requested() {
900        let mut cmd = Command::new("ls");
901        cmd.arg("-la");
902        cmd.env("FOO", "foo");
903        cmd.current_dir(PathBuf::from("./"));
904
905        let mut process = Process::new(cmd)
906            .with_auto_name(AutoName::Debug)
907            .spawn_broadcast()
908            .expect("Failed to spawn");
909
910        assert_that!(&process.name).is_equal_to(
911            "Command { std: cd \"./\" && FOO=\"foo\" \"ls\" \"-la\", kill_on_drop: false }",
912        );
913
914        let _ = process.wait_for_completion(None).await;
915    }
916
917    #[tokio::test]
918    async fn process_builder_custom_name() {
919        let id = 42;
920        let mut process = Process::new(Command::new("ls"))
921            .with_name(format!("worker-{id}"))
922            .spawn_broadcast()
923            .expect("Failed to spawn");
924
925        assert_that!(&process.name).is_equal_to("worker-42");
926
927        let _ = process.wait_for_completion(None).await;
928    }
929}