tokio_process_tools/process.rs
1//! Builder API for spawning processes with explicit stream type selection.
2
3use crate::error::SpawnError;
4use crate::output_stream::broadcast::BroadcastOutputStream;
5use crate::output_stream::single_subscriber::SingleSubscriberOutputStream;
6use crate::output_stream::{BackpressureControl, DEFAULT_CHANNEL_CAPACITY, DEFAULT_CHUNK_SIZE};
7use crate::process_handle::SingleSubscriberStreamConfig;
8use crate::{NumBytes, ProcessHandle};
9use std::borrow::Cow;
10
11/// Controls how the process name is automatically generated when not explicitly provided.
12///
13/// This determines what information is included in the auto-generated process name
14/// used for logging and debugging purposes.
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum AutoName {
17 /// Capture a name from the command as specified by the provided settings.
18 ///
19 /// Example: `"ls -la"` from `Command::new("ls").arg("-la")`
20 Using(AutoNameSettings),
21
22 /// Capture the full Debug representation of the Command.
23 ///
24 /// Example: `"Command { std: \"ls\" \"-la\", kill_on_drop: false }"`
25 ///
26 /// Note: This includes internal implementation details and may change with tokio updates.
27 Debug,
28}
29
30impl Default for AutoName {
31 fn default() -> Self {
32 Self::Using(AutoNameSettings::program_with_args())
33 }
34}
35
36/// Controls in detail which parts of the command are automatically captured as the process name.
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38#[expect(
39 clippy::struct_excessive_bools,
40 reason = "each flag controls one optional part of the generated process name"
41)]
42pub struct AutoNameSettings {
43 include_current_dir: bool,
44 include_envs: bool,
45 include_program: bool,
46 include_args: bool,
47}
48
49impl AutoNameSettings {
50 /// Capture the program name.
51 ///
52 /// Example: `Command::new("ls").arg("-la").env("FOO", "foo)` is captured as `"ls"`.
53 #[must_use]
54 pub fn program_only() -> Self {
55 AutoNameSettings {
56 include_current_dir: false,
57 include_envs: false,
58 include_program: true,
59 include_args: false,
60 }
61 }
62
63 /// Capture the program name and all arguments.
64 ///
65 /// Example: `Command::new("ls").arg("-la").env("FOO", "foo)` is captured as `"ls -la"`.
66 #[must_use]
67 pub fn program_with_args() -> Self {
68 AutoNameSettings {
69 include_current_dir: false,
70 include_envs: false,
71 include_program: true,
72 include_args: true,
73 }
74 }
75
76 /// Capture the program name and all environment variables and arguments.
77 ///
78 /// Example: `Command::new("ls").arg("-la").env("FOO", "foo)` is captured as `"FOO=foo ls -la"`.
79 #[must_use]
80 pub fn program_with_env_and_args() -> Self {
81 AutoNameSettings {
82 include_current_dir: false,
83 include_envs: true,
84 include_program: true,
85 include_args: true,
86 }
87 }
88
89 /// Capture the directory and the program name and all environment variables and arguments.
90 ///
91 /// Example: `Command::new("ls").arg("-la").env("FOO", "foo)` is captured as `"/some/dir % FOO=foo ls -la"`.
92 #[must_use]
93 pub fn full() -> Self {
94 AutoNameSettings {
95 include_current_dir: true,
96 include_envs: true,
97 include_program: true,
98 include_args: true,
99 }
100 }
101
102 fn format_cmd(self, cmd: &std::process::Command) -> String {
103 let mut name = String::new();
104 if self.include_current_dir
105 && let Some(current_dir) = cmd.get_current_dir()
106 {
107 name.push_str(current_dir.to_string_lossy().as_ref());
108 name.push_str(" % ");
109 }
110 if self.include_envs {
111 let envs = cmd.get_envs();
112 if envs.len() != 0 {
113 for (key, value) in envs
114 .filter(|(_key, value)| value.is_some())
115 .map(|(key, value)| (key, value.expect("present")))
116 {
117 name.push_str(key.to_string_lossy().as_ref());
118 name.push('=');
119 name.push_str(value.to_string_lossy().as_ref());
120 name.push(' ');
121 }
122 }
123 }
124 if self.include_program {
125 name.push_str(cmd.get_program().to_string_lossy().as_ref());
126 name.push(' ');
127 }
128 if self.include_args {
129 let args = cmd.get_args();
130 if args.len() != 0 {
131 for arg in args {
132 name.push('"');
133 name.push_str(arg.to_string_lossy().as_ref());
134 name.push('"');
135 name.push(' ');
136 }
137 }
138 }
139 if name.ends_with(' ') {
140 name.pop();
141 }
142 name
143 }
144}
145
146/// Specifies how a process should be named.
147///
148/// This enum allows you to either provide an explicit name or configure automatic
149/// name generation. Using this type ensures you cannot accidentally set both an
150/// explicit name and an auto-naming mode at the same time.
151#[derive(Debug, Clone)]
152pub enum ProcessName {
153 /// Use an explicit custom name.
154 ///
155 /// Example: `ProcessName::Explicit("my-server")`
156 Explicit(Cow<'static, str>),
157
158 /// Auto-generate the name based on the command.
159 ///
160 /// Example: `ProcessName::Auto(AutoName::ProgramWithArgs)`
161 Auto(AutoName),
162}
163
164impl Default for ProcessName {
165 fn default() -> Self {
166 Self::Auto(AutoName::default())
167 }
168}
169
170impl From<&'static str> for ProcessName {
171 fn from(s: &'static str) -> Self {
172 Self::Explicit(Cow::Borrowed(s))
173 }
174}
175
176impl From<String> for ProcessName {
177 fn from(s: String) -> Self {
178 Self::Explicit(Cow::Owned(s))
179 }
180}
181
182impl From<Cow<'static, str>> for ProcessName {
183 fn from(s: Cow<'static, str>) -> Self {
184 Self::Explicit(s)
185 }
186}
187
188impl From<AutoName> for ProcessName {
189 fn from(mode: AutoName) -> Self {
190 Self::Auto(mode)
191 }
192}
193
194/// A builder for configuring and spawning a process.
195///
196/// This provides an ergonomic API for spawning processes while keeping the stream type
197/// (broadcast vs single subscriber) explicit at the spawn callsite.
198///
199/// # Examples
200///
201/// ```no_run
202/// use tokio_process_tools::*;
203/// use tokio::process::Command;
204///
205/// # tokio_test::block_on(async {
206/// // Simple case with auto-derived name
207/// let process = Process::new(Command::new("ls"))
208/// .spawn_broadcast()?;
209///
210/// // With explicit name (no allocation when using string literal)
211/// let process = Process::new(Command::new("server"))
212/// .name("my-server")
213/// .spawn_single_subscriber()?;
214///
215/// // With custom capacities
216/// let process = Process::new(Command::new("cargo"))
217/// .name("test-runner")
218/// .stdout_capacity(512)
219/// .stderr_capacity(512)
220/// .spawn_broadcast()?;
221/// # Ok::<_, SpawnError>(())
222/// # });
223/// ```
224pub struct Process {
225 cmd: tokio::process::Command,
226 name: ProcessName,
227 stdout_chunk_size: NumBytes,
228 stderr_chunk_size: NumBytes,
229 stdout_capacity: usize,
230 stderr_capacity: usize,
231 stdout_backpressure_control: BackpressureControl,
232 stderr_backpressure_control: BackpressureControl,
233}
234
235impl Process {
236 /// Creates a new process builder from a tokio command.
237 ///
238 /// If no name is explicitly set via [`Process::name`], the name will be auto-derived
239 /// from the command's program name.
240 ///
241 /// # Examples
242 ///
243 /// ```no_run
244 /// use tokio_process_tools::*;
245 /// use tokio::process::Command;
246 ///
247 /// # tokio_test::block_on(async {
248 /// let process = Process::new(Command::new("ls"))
249 /// .spawn_broadcast()?;
250 /// # Ok::<_, SpawnError>(())
251 /// # });
252 /// ```
253 #[must_use]
254 pub fn new(cmd: tokio::process::Command) -> Self {
255 Self {
256 cmd,
257 name: ProcessName::default(),
258 stdout_chunk_size: DEFAULT_CHUNK_SIZE,
259 stderr_chunk_size: DEFAULT_CHUNK_SIZE,
260 stdout_capacity: DEFAULT_CHANNEL_CAPACITY,
261 stderr_capacity: DEFAULT_CHANNEL_CAPACITY,
262 stdout_backpressure_control: BackpressureControl::DropLatestIncomingIfBufferFull,
263 stderr_backpressure_control: BackpressureControl::DropLatestIncomingIfBufferFull,
264 }
265 }
266
267 /// Sets how the process should be named.
268 ///
269 /// You can provide either an explicit name or configure automatic name generation.
270 /// The name is used for logging and debugging purposes.
271 ///
272 /// # Examples
273 ///
274 /// ```no_run
275 /// use tokio_process_tools::*;
276 /// use tokio::process::Command;
277 ///
278 /// # tokio_test::block_on(async {
279 /// // Explicit name
280 /// let process = Process::new(Command::new("server"))
281 /// .name(ProcessName::Explicit("my-server".into()))
282 /// .spawn_broadcast()?;
283 ///
284 /// // Auto-generated with arguments
285 /// let mut cmd = Command::new("cargo");
286 /// cmd.arg("test");
287 /// let process = Process::new(cmd)
288 /// .name(ProcessName::Auto(AutoName::Using(AutoNameSettings::program_with_args())))
289 /// .spawn_broadcast()?;
290 /// # Ok::<_, SpawnError>(())
291 /// # });
292 /// ```
293 #[must_use]
294 pub fn name(mut self, name: impl Into<ProcessName>) -> Self {
295 self.name = name.into();
296 self
297 }
298
299 /// Convenience method to set an explicit process name.
300 ///
301 /// This is a shorthand for `.name(ProcessName::Explicit(...))`.
302 ///
303 /// # Examples
304 ///
305 /// ```no_run
306 /// use tokio_process_tools::*;
307 /// use tokio::process::Command;
308 ///
309 /// # tokio_test::block_on(async {
310 /// // Static name (no allocation)
311 /// let process = Process::new(Command::new("server"))
312 /// .with_name("my-server")
313 /// .spawn_broadcast()?;
314 ///
315 /// // Dynamic name (allocates)
316 /// let id = 42;
317 /// let process = Process::new(Command::new("worker"))
318 /// .with_name(format!("worker-{id}"))
319 /// .spawn_single_subscriber()?;
320 /// # Ok::<_, SpawnError>(())
321 /// # });
322 /// ```
323 #[must_use]
324 pub fn with_name(self, name: impl Into<Cow<'static, str>>) -> Self {
325 self.name(ProcessName::Explicit(name.into()))
326 }
327
328 /// Convenience method to configure automatic name generation.
329 ///
330 /// This is a shorthand for `.name(ProcessName::Auto(...))`.
331 ///
332 /// # Examples
333 ///
334 /// ```no_run
335 /// use tokio_process_tools::*;
336 /// use tokio::process::Command;
337 ///
338 /// # tokio_test::block_on(async {
339 /// let mut cmd = Command::new("server");
340 /// cmd.arg("--database").arg("sqlite");
341 /// cmd.env("S3_ENDPOINT", "127.0.0.1:9000");
342 ///
343 /// let process = Process::new(cmd)
344 /// .with_auto_name(AutoName::Using(AutoNameSettings::program_with_env_and_args()))
345 /// .spawn_broadcast()?;
346 /// # Ok::<_, SpawnError>(())
347 /// # });
348 /// ```
349 #[must_use]
350 pub fn with_auto_name(self, mode: AutoName) -> Self {
351 self.name(ProcessName::Auto(mode))
352 }
353
354 /// Sets the stdout chunk size.
355 ///
356 /// This controls the size of the buffer used when reading from the process's stdout stream.
357 /// Default is [`DEFAULT_CHUNK_SIZE`].
358 ///
359 /// # Panics
360 ///
361 /// Panics if `chunk_size` is zero.
362 ///
363 /// # Examples
364 ///
365 /// ```no_run
366 /// use tokio_process_tools::*;
367 /// use tokio::process::Command;
368 ///
369 /// # tokio_test::block_on(async {
370 /// let process = Process::new(Command::new("server"))
371 /// .stdout_chunk_size(32.kilobytes())
372 /// .spawn_broadcast()?;
373 /// # Ok::<_, SpawnError>(())
374 /// # });
375 /// ```
376 #[must_use]
377 pub fn stdout_chunk_size(mut self, chunk_size: NumBytes) -> Self {
378 chunk_size.assert_non_zero("chunk_size");
379 self.stdout_chunk_size = chunk_size;
380 self
381 }
382
383 /// Sets the stderr chunk size.
384 ///
385 /// This controls the size of the buffer used when reading from the process's stderr stream.
386 /// Default is [`DEFAULT_CHUNK_SIZE`].
387 ///
388 /// # Panics
389 ///
390 /// Panics if `chunk_size` is zero.
391 ///
392 /// # Examples
393 ///
394 /// ```no_run
395 /// use tokio_process_tools::*;
396 /// use tokio::process::Command;
397 ///
398 /// # tokio_test::block_on(async {
399 /// let process = Process::new(Command::new("server"))
400 /// .stderr_chunk_size(32.kilobytes())
401 /// .spawn_broadcast()?;
402 /// # Ok::<_, SpawnError>(())
403 /// # });
404 /// ```
405 #[must_use]
406 pub fn stderr_chunk_size(mut self, chunk_size: NumBytes) -> Self {
407 chunk_size.assert_non_zero("chunk_size");
408 self.stderr_chunk_size = chunk_size;
409 self
410 }
411
412 /// Sets the stdout and stderr chunk sizes.
413 ///
414 /// This controls the size of the buffers used when reading from the process's stdout and
415 /// stderr streams.
416 /// Default is [`DEFAULT_CHUNK_SIZE`].
417 ///
418 /// # Panics
419 ///
420 /// Panics if `chunk_size` is zero.
421 ///
422 /// # Examples
423 ///
424 /// ```no_run
425 /// use tokio_process_tools::*;
426 /// use tokio::process::Command;
427 ///
428 /// # tokio_test::block_on(async {
429 /// let process = Process::new(Command::new("server"))
430 /// .chunk_sizes(32.kilobytes())
431 /// .spawn_broadcast()?;
432 /// # Ok::<_, SpawnError>(())
433 /// # });
434 /// ```
435 #[must_use]
436 pub fn chunk_sizes(mut self, chunk_size: NumBytes) -> Self {
437 chunk_size.assert_non_zero("chunk_size");
438 self.stdout_chunk_size = chunk_size;
439 self.stderr_chunk_size = chunk_size;
440 self
441 }
442
443 /// Sets the stdout channel capacity.
444 ///
445 /// This controls how many chunks can be buffered before backpressure is applied.
446 /// Default is [`DEFAULT_CHANNEL_CAPACITY`].
447 ///
448 /// # Examples
449 ///
450 /// ```no_run
451 /// use tokio_process_tools::*;
452 /// use tokio::process::Command;
453 ///
454 /// # tokio_test::block_on(async {
455 /// let process = Process::new(Command::new("server"))
456 /// .stdout_capacity(512)
457 /// .spawn_broadcast()?;
458 /// # Ok::<_, SpawnError>(())
459 /// # });
460 /// ```
461 #[must_use]
462 pub fn stdout_capacity(mut self, capacity: usize) -> Self {
463 self.stdout_capacity = capacity;
464 self
465 }
466
467 /// Sets the stderr channel capacity.
468 ///
469 /// This controls how many chunks can be buffered before backpressure is applied.
470 /// Default is [`DEFAULT_CHANNEL_CAPACITY`].
471 ///
472 /// # Examples
473 ///
474 /// ```no_run
475 /// use tokio_process_tools::*;
476 /// use tokio::process::Command;
477 ///
478 /// # tokio_test::block_on(async {
479 /// let process = Process::new(Command::new("server"))
480 /// .stderr_capacity(256)
481 /// .spawn_broadcast()?;
482 /// # Ok::<_, SpawnError>(())
483 /// # });
484 /// ```
485 #[must_use]
486 pub fn stderr_capacity(mut self, capacity: usize) -> Self {
487 self.stderr_capacity = capacity;
488 self
489 }
490
491 /// Sets the stdout and stderr channel capacity.
492 ///
493 /// This controls how many chunks can be buffered before backpressure is applied.
494 /// Default is [`DEFAULT_CHANNEL_CAPACITY`].
495 ///
496 /// # Examples
497 ///
498 /// ```no_run
499 /// use tokio_process_tools::*;
500 /// use tokio::process::Command;
501 ///
502 /// # tokio_test::block_on(async {
503 /// let process = Process::new(Command::new("server"))
504 /// .capacities(256)
505 /// .spawn_broadcast()?;
506 /// # Ok::<_, SpawnError>(())
507 /// # });
508 /// ```
509 #[must_use]
510 pub fn capacities(mut self, capacity: usize) -> Self {
511 self.stdout_capacity = capacity;
512 self.stderr_capacity = capacity;
513 self
514 }
515
516 /// Sets the stdout backpressure policy used by `.spawn_single_subscriber()`.
517 ///
518 /// The default is [`BackpressureControl::DropLatestIncomingIfBufferFull`], which prioritizes
519 /// keeping the child process unblocked over delivering every chunk to the consumer. Use
520 /// [`BackpressureControl::BlockUntilBufferHasSpace`] when you prefer reliable observation over
521 /// throughput, for example when waiting for a startup line in tests.
522 ///
523 /// This setting is ignored by `.spawn_broadcast()`.
524 #[must_use]
525 pub fn stdout_backpressure_control(
526 mut self,
527 backpressure_control: BackpressureControl,
528 ) -> Self {
529 self.stdout_backpressure_control = backpressure_control;
530 self
531 }
532
533 /// Sets the stderr backpressure policy used by `.spawn_single_subscriber()`.
534 ///
535 /// The default is [`BackpressureControl::DropLatestIncomingIfBufferFull`].
536 /// This setting is ignored by `.spawn_broadcast()`.
537 #[must_use]
538 pub fn stderr_backpressure_control(
539 mut self,
540 backpressure_control: BackpressureControl,
541 ) -> Self {
542 self.stderr_backpressure_control = backpressure_control;
543 self
544 }
545
546 /// Sets the stdout and stderr backpressure policy used by `.spawn_single_subscriber()`.
547 ///
548 /// This is a shorthand for configuring both streams with the same
549 /// [`BackpressureControl`]. The default is
550 /// [`BackpressureControl::DropLatestIncomingIfBufferFull`].
551 ///
552 /// This setting is ignored by `.spawn_broadcast()`.
553 #[must_use]
554 pub fn backpressure_control(mut self, backpressure_control: BackpressureControl) -> Self {
555 self.stdout_backpressure_control = backpressure_control;
556 self.stderr_backpressure_control = backpressure_control;
557 self
558 }
559
560 /// Generates a process name based on the configured naming strategy.
561 fn generate_name(&self) -> Cow<'static, str> {
562 match &self.name {
563 ProcessName::Explicit(name) => name.clone(),
564 ProcessName::Auto(auto_name) => match auto_name {
565 AutoName::Using(settings) => settings.format_cmd(self.cmd.as_std()).into(),
566 AutoName::Debug => format!("{:?}", self.cmd).into(),
567 },
568 }
569 }
570
571 /// Spawns the process with broadcast output streams.
572 ///
573 /// Broadcast streams support multiple concurrent consumers of stdout/stderr,
574 /// which is useful when you need to inspect, collect, and process output
575 /// simultaneously. This comes with slightly higher memory overhead due to cloning.
576 ///
577 /// Broadcast streams are lossy under pressure: if a receiver falls behind the bounded
578 /// broadcast buffer, it may miss older chunks. This backend does not support blocking
579 /// backpressure. If you need reliable delivery with backpressure, use
580 /// [`Process::spawn_single_subscriber`] together with
581 /// [`BackpressureControl::BlockUntilBufferHasSpace`].
582 ///
583 /// # Examples
584 ///
585 /// ```no_run
586 /// use tokio_process_tools::*;
587 /// use tokio::process::Command;
588 ///
589 /// # tokio_test::block_on(async {
590 /// let mut process = Process::new(Command::new("ls"))
591 /// .spawn_broadcast()?;
592 ///
593 /// // Multiple consumers can read the same output
594 /// let _logger = process.stdout().inspect_lines(|line| {
595 /// println!("{}", line);
596 /// tokio_process_tools::Next::Continue
597 /// }, Default::default());
598 ///
599 /// let _collector = process.stdout().collect_lines_into_vec(Default::default());
600 /// # Ok::<_, SpawnError>(())
601 /// # });
602 /// ```
603 ///
604 /// # Errors
605 ///
606 /// Returns [`SpawnError::SpawnFailed`] if the process cannot be spawned.
607 pub fn spawn_broadcast(self) -> Result<ProcessHandle<BroadcastOutputStream>, SpawnError> {
608 let name = self.generate_name();
609 ProcessHandle::<BroadcastOutputStream>::spawn_with_capacity(
610 name,
611 self.cmd,
612 self.stdout_chunk_size,
613 self.stderr_chunk_size,
614 self.stdout_capacity,
615 self.stderr_capacity,
616 )
617 }
618
619 /// Spawns the process with single subscriber output streams.
620 ///
621 /// Single subscriber streams are more efficient (lower memory, no cloning) but
622 /// only allow one consumer of stdout/stderr at a time. Use this when you only
623 /// need to either inspect OR collect output, not both simultaneously.
624 ///
625 /// # Examples
626 ///
627 /// ```no_run
628 /// use tokio_process_tools::*;
629 /// use tokio::process::Command;
630 ///
631 /// # tokio_test::block_on(async {
632 /// let process = Process::new(Command::new("ls"))
633 /// .spawn_single_subscriber()?;
634 ///
635 /// // Only one consumer allowed
636 /// let collector = process.stdout().collect_lines_into_vec(Default::default());
637 /// # Ok::<_, SpawnError>(())
638 /// # });
639 /// ```
640 ///
641 /// # Errors
642 ///
643 /// Returns [`SpawnError::SpawnFailed`] if the process cannot be spawned.
644 pub fn spawn_single_subscriber(
645 self,
646 ) -> Result<ProcessHandle<SingleSubscriberOutputStream>, SpawnError> {
647 let name = self.generate_name();
648 ProcessHandle::<SingleSubscriberOutputStream>::spawn_with_capacity(
649 name,
650 self.cmd,
651 SingleSubscriberStreamConfig {
652 chunk_size: self.stdout_chunk_size,
653 channel_capacity: self.stdout_capacity,
654 backpressure_control: self.stdout_backpressure_control,
655 },
656 SingleSubscriberStreamConfig {
657 chunk_size: self.stderr_chunk_size,
658 channel_capacity: self.stderr_capacity,
659 backpressure_control: self.stderr_backpressure_control,
660 },
661 )
662 }
663}
664
665#[cfg(test)]
666mod tests {
667 use super::*;
668 use crate::{
669 BackpressureControl, LineParsingOptions, NumBytes, NumBytesExt, Output, OutputStream,
670 };
671 use assertr::prelude::*;
672 use std::path::PathBuf;
673 use tokio::process::Command;
674
675 #[test]
676 #[should_panic(expected = "chunk_size must be greater than zero bytes")]
677 fn process_builder_panics_on_zero_chunk_size() {
678 let _process = Process::new(Command::new("ls")).chunk_sizes(NumBytes::zero());
679 }
680
681 #[tokio::test]
682 async fn process_builder_broadcast() {
683 let mut process = Process::new(Command::new("ls"))
684 .spawn_broadcast()
685 .expect("Failed to spawn");
686
687 assert_that!(process.stdout().chunk_size()).is_equal_to(DEFAULT_CHUNK_SIZE);
688 assert_that!(process.stderr().chunk_size()).is_equal_to(DEFAULT_CHUNK_SIZE);
689 assert_that!(process.stdout().channel_capacity()).is_equal_to(DEFAULT_CHANNEL_CAPACITY);
690 assert_that!(process.stderr().channel_capacity()).is_equal_to(DEFAULT_CHANNEL_CAPACITY);
691
692 let Output {
693 status,
694 stdout,
695 stderr,
696 } = process
697 .wait_for_completion_with_output(None, LineParsingOptions::default())
698 .await
699 .unwrap();
700
701 assert_that!(status.success()).is_true();
702 assert_that!(stdout).is_not_empty();
703 assert_that!(stderr).is_empty();
704 }
705
706 #[tokio::test]
707 async fn process_builder_broadcast_with_custom_capacities() {
708 let mut process = Process::new(Command::new("ls"))
709 .stdout_chunk_size(42.kilobytes())
710 .stderr_chunk_size(43.kilobytes())
711 .stdout_capacity(42)
712 .stderr_capacity(43)
713 .spawn_broadcast()
714 .expect("Failed to spawn");
715
716 assert_that!(process.stdout().chunk_size()).is_equal_to(42.kilobytes());
717 assert_that!(process.stderr().chunk_size()).is_equal_to(43.kilobytes());
718 assert_that!(process.stdout().channel_capacity()).is_equal_to(42);
719 assert_that!(process.stderr().channel_capacity()).is_equal_to(43);
720
721 let Output {
722 status,
723 stdout,
724 stderr,
725 } = process
726 .wait_for_completion_with_output(None, LineParsingOptions::default())
727 .await
728 .unwrap();
729
730 assert_that!(status.success()).is_true();
731 assert_that!(stdout).is_not_empty();
732 assert_that!(stderr).is_empty();
733 }
734
735 #[tokio::test]
736 async fn process_builder_single_subscriber_with_custom_backpressure_controls() {
737 let mut process = Process::new(Command::new("ls"))
738 .stdout_backpressure_control(BackpressureControl::BlockUntilBufferHasSpace)
739 .stderr_backpressure_control(BackpressureControl::DropLatestIncomingIfBufferFull)
740 .spawn_single_subscriber()
741 .expect("Failed to spawn");
742
743 assert_that!(process.stdout().backpressure_control())
744 .is_equal_to(BackpressureControl::BlockUntilBufferHasSpace);
745 assert_that!(process.stderr().backpressure_control())
746 .is_equal_to(BackpressureControl::DropLatestIncomingIfBufferFull);
747
748 let _ = process.wait_for_completion(None).await.unwrap();
749 }
750
751 #[tokio::test]
752 async fn process_builder_single_subscriber_with_shared_backpressure_control() {
753 let mut process = Process::new(Command::new("ls"))
754 .backpressure_control(BackpressureControl::BlockUntilBufferHasSpace)
755 .spawn_single_subscriber()
756 .expect("Failed to spawn");
757
758 assert_that!(process.stdout().backpressure_control())
759 .is_equal_to(BackpressureControl::BlockUntilBufferHasSpace);
760 assert_that!(process.stderr().backpressure_control())
761 .is_equal_to(BackpressureControl::BlockUntilBufferHasSpace);
762
763 let _ = process.wait_for_completion(None).await.unwrap();
764 }
765
766 #[tokio::test]
767 async fn process_builder_single_subscriber() {
768 let mut process = Process::new(Command::new("ls"))
769 .spawn_single_subscriber()
770 .expect("Failed to spawn");
771
772 assert_that!(process.stdout().chunk_size()).is_equal_to(DEFAULT_CHUNK_SIZE);
773 assert_that!(process.stderr().chunk_size()).is_equal_to(DEFAULT_CHUNK_SIZE);
774 assert_that!(process.stdout().channel_capacity()).is_equal_to(DEFAULT_CHANNEL_CAPACITY);
775 assert_that!(process.stderr().channel_capacity()).is_equal_to(DEFAULT_CHANNEL_CAPACITY);
776 assert_that!(process.stdout().backpressure_control())
777 .is_equal_to(BackpressureControl::DropLatestIncomingIfBufferFull);
778 assert_that!(process.stderr().backpressure_control())
779 .is_equal_to(BackpressureControl::DropLatestIncomingIfBufferFull);
780
781 let Output {
782 status,
783 stdout,
784 stderr,
785 } = process
786 .wait_for_completion_with_output(None, LineParsingOptions::default())
787 .await
788 .unwrap();
789
790 assert_that!(status.success()).is_true();
791 assert_that!(stdout).is_not_empty();
792 assert_that!(stderr).is_empty();
793 }
794
795 #[tokio::test]
796 async fn process_builder_single_subscriber_with_custom_capacities() {
797 let mut process = Process::new(Command::new("ls"))
798 .stdout_chunk_size(42.kilobytes())
799 .stderr_chunk_size(43.kilobytes())
800 .stdout_capacity(42)
801 .stderr_capacity(43)
802 .spawn_single_subscriber()
803 .expect("Failed to spawn");
804
805 assert_that!(process.stdout().chunk_size()).is_equal_to(42.kilobytes());
806 assert_that!(process.stderr().chunk_size()).is_equal_to(43.kilobytes());
807 assert_that!(process.stdout().channel_capacity()).is_equal_to(42);
808 assert_that!(process.stderr().channel_capacity()).is_equal_to(43);
809 assert_that!(process.stdout().backpressure_control())
810 .is_equal_to(BackpressureControl::DropLatestIncomingIfBufferFull);
811 assert_that!(process.stderr().backpressure_control())
812 .is_equal_to(BackpressureControl::DropLatestIncomingIfBufferFull);
813
814 let Output {
815 status,
816 stdout,
817 stderr,
818 } = process
819 .wait_for_completion_with_output(None, LineParsingOptions::default())
820 .await
821 .unwrap();
822
823 assert_that!(status.success()).is_true();
824 assert_that!(stdout).is_not_empty();
825 assert_that!(stderr).is_empty();
826 }
827
828 #[tokio::test]
829 async fn process_builder_auto_name_captures_command_with_args_if_not_otherwise_specified() {
830 let mut cmd = Command::new("ls");
831 cmd.arg("-la");
832 cmd.env("FOO", "foo");
833 cmd.current_dir(PathBuf::from("./"));
834
835 let mut process = Process::new(cmd)
836 .spawn_broadcast()
837 .expect("Failed to spawn");
838
839 assert_that!(&process.name).is_equal_to("ls \"-la\"");
840
841 let _ = process.wait_for_completion(None).await;
842 }
843
844 #[tokio::test]
845 async fn process_builder_auto_name_only_captures_command_when_requested() {
846 let mut cmd = Command::new("ls");
847 cmd.arg("-la");
848 cmd.env("FOO", "foo");
849 cmd.current_dir(PathBuf::from("./"));
850
851 let mut process = Process::new(cmd)
852 .with_auto_name(AutoName::Using(AutoNameSettings::program_only()))
853 .spawn_broadcast()
854 .expect("Failed to spawn");
855
856 assert_that!(&process.name).is_equal_to("ls");
857
858 let _ = process.wait_for_completion(None).await;
859 }
860
861 #[tokio::test]
862 async fn process_builder_auto_name_captures_command_with_envs_and_args_when_requested() {
863 let mut cmd = Command::new("ls");
864 cmd.arg("-la");
865 cmd.env("FOO", "foo");
866 cmd.current_dir(PathBuf::from("./"));
867
868 let mut process = Process::new(cmd)
869 .with_auto_name(AutoName::Using(
870 AutoNameSettings::program_with_env_and_args(),
871 ))
872 .spawn_broadcast()
873 .expect("Failed to spawn");
874
875 assert_that!(&process.name).is_equal_to("FOO=foo ls \"-la\"");
876
877 let _ = process.wait_for_completion(None).await;
878 }
879
880 #[tokio::test]
881 async fn process_builder_auto_name_captures_command_with_current_dir_envs_and_args_when_requested()
882 {
883 let mut cmd = Command::new("ls");
884 cmd.arg("-la");
885 cmd.env("FOO", "foo");
886 cmd.current_dir(PathBuf::from("./"));
887
888 let mut process = Process::new(cmd)
889 .with_auto_name(AutoName::Using(AutoNameSettings::full()))
890 .spawn_broadcast()
891 .expect("Failed to spawn");
892
893 assert_that!(&process.name).is_equal_to("./ % FOO=foo ls \"-la\"");
894
895 let _ = process.wait_for_completion(None).await;
896 }
897
898 #[tokio::test]
899 async fn process_builder_auto_name_captures_full_command_debug_string_when_requested() {
900 let mut cmd = Command::new("ls");
901 cmd.arg("-la");
902 cmd.env("FOO", "foo");
903 cmd.current_dir(PathBuf::from("./"));
904
905 let mut process = Process::new(cmd)
906 .with_auto_name(AutoName::Debug)
907 .spawn_broadcast()
908 .expect("Failed to spawn");
909
910 assert_that!(&process.name).is_equal_to(
911 "Command { std: cd \"./\" && FOO=\"foo\" \"ls\" \"-la\", kill_on_drop: false }",
912 );
913
914 let _ = process.wait_for_completion(None).await;
915 }
916
917 #[tokio::test]
918 async fn process_builder_custom_name() {
919 let id = 42;
920 let mut process = Process::new(Command::new("ls"))
921 .with_name(format!("worker-{id}"))
922 .spawn_broadcast()
923 .expect("Failed to spawn");
924
925 assert_that!(&process.name).is_equal_to("worker-42");
926
927 let _ = process.wait_for_completion(None).await;
928 }
929}