async_process/lib.rs
1//! Async interface for working with processes.
2//!
3//! This crate is an async version of [`std::process`].
4//!
5//! # Implementation
6//!
7//! A background thread named "async-process" is lazily created on first use, which waits for
8//! spawned child processes to exit and then calls the `wait()` syscall to clean up the "zombie"
9//! processes. This is unlike the `process` API in the standard library, where dropping a running
10//! `Child` leaks its resources.
11//!
12//! This crate uses [`async-io`] for async I/O on Unix-like systems and [`blocking`] for async I/O
13//! on Windows.
14//!
15//! [`async-io`]: https://docs.rs/async-io
16//! [`blocking`]: https://docs.rs/blocking
17//!
18//! # Examples
19//!
20//! Spawn a process and collect its output:
21//!
22//! ```no_run
23//! # futures_lite::future::block_on(async {
24//! use async_process::Command;
25//!
26//! let out = Command::new("echo").arg("hello").arg("world").output().await?;
27//! assert_eq!(out.stdout, b"hello world\n");
28//! # std::io::Result::Ok(()) });
29//! ```
30//!
31//! Read the output line-by-line as it gets produced:
32//!
33//! ```no_run
34//! # futures_lite::future::block_on(async {
35//! use async_process::{Command, Stdio};
36//! use futures_lite::{io::BufReader, prelude::*};
37//!
38//! let mut child = Command::new("find")
39//! .arg(".")
40//! .stdout(Stdio::piped())
41//! .spawn()?;
42//!
43//! let mut lines = BufReader::new(child.stdout.take().unwrap()).lines();
44//!
45//! while let Some(line) = lines.next().await {
46//! println!("{}", line?);
47//! }
48//! # std::io::Result::Ok(()) });
49//! ```
50
51#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
52#![doc(
53 html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
54)]
55#![doc(
56 html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
57)]
58
59use std::convert::Infallible;
60use std::ffi::OsStr;
61use std::fmt;
62use std::path::Path;
63use std::pin::Pin;
64use std::sync::atomic::{AtomicUsize, Ordering};
65use std::sync::{Arc, Mutex, OnceLock};
66use std::task::{Context, Poll};
67use std::thread;
68
69#[cfg(unix)]
70use async_io::Async;
71#[cfg(unix)]
72use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
73
74#[cfg(windows)]
75use blocking::Unblock;
76
77use futures_lite::{future, io, prelude::*};
78
79#[doc(no_inline)]
80pub use std::process::{ExitStatus, Output, Stdio};
81
82#[cfg(unix)]
83pub mod unix;
84#[cfg(windows)]
85pub mod windows;
86
87mod reaper;
88
89mod sealed {
90 pub trait Sealed {}
91}
92
93#[cfg(test)]
94static DRIVER_THREAD_SPAWNED: std::sync::atomic::AtomicBool =
95 std::sync::atomic::AtomicBool::new(false);
96
97/// The zombie process reaper.
98///
99/// This structure reaps zombie processes and emits the `SIGCHLD` signal.
100struct Reaper {
101 /// Underlying system reaper.
102 sys: reaper::Reaper,
103
104 /// The number of tasks polling the SIGCHLD event.
105 ///
106 /// If this is zero, the `async-process` thread must be spawned.
107 drivers: AtomicUsize,
108
109 /// Number of live `Child` instances currently running.
110 ///
111 /// This is used to prevent the reaper thread from being spawned right as the program closes,
112 /// when the reaper thread isn't needed. This represents the number of active processes.
113 child_count: AtomicUsize,
114}
115
116impl Reaper {
117 /// Get the singleton instance of the reaper.
118 fn get() -> &'static Self {
119 static REAPER: OnceLock<Reaper> = OnceLock::new();
120
121 REAPER.get_or_init(|| Reaper {
122 sys: reaper::Reaper::new(),
123 drivers: AtomicUsize::new(0),
124 child_count: AtomicUsize::new(0),
125 })
126 }
127
128 /// Ensure that the reaper is driven.
129 ///
130 /// If there are no active `driver()` callers, this will spawn the `async-process` thread.
131 #[inline]
132 fn ensure_driven(&'static self) {
133 if self
134 .drivers
135 .compare_exchange(0, 1, Ordering::SeqCst, Ordering::Acquire)
136 .is_ok()
137 {
138 self.start_driver_thread();
139 }
140 }
141
142 /// Start the `async-process` thread.
143 #[cold]
144 fn start_driver_thread(&'static self) {
145 #[cfg(test)]
146 DRIVER_THREAD_SPAWNED
147 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
148 .unwrap_or_else(|_| unreachable!("Driver thread already spawned"));
149
150 thread::Builder::new()
151 .name("async-process".to_string())
152 .spawn(move || {
153 let driver = async move {
154 // No need to bump self.drivers, it was already bumped in ensure_driven.
155 let guard = self.sys.lock().await;
156 self.sys.reap(guard).await
157 };
158
159 #[cfg(unix)]
160 async_io::block_on(driver);
161
162 #[cfg(not(unix))]
163 future::block_on(driver);
164 })
165 .expect("cannot spawn async-process thread");
166 }
167
168 /// Register a process with this reaper.
169 fn register(&'static self, child: std::process::Child) -> io::Result<reaper::ChildGuard> {
170 self.ensure_driven();
171 self.sys.register(child)
172 }
173}
174
175cfg_if::cfg_if! {
176 if #[cfg(windows)] {
177 // Wraps a sync I/O type into an async I/O type.
178 fn wrap<T>(io: T) -> io::Result<Unblock<T>> {
179 Ok(Unblock::new(io))
180 }
181 } else if #[cfg(unix)] {
182 /// Wrap a file descriptor into a non-blocking I/O type.
183 fn wrap<T: std::os::unix::io::AsFd>(io: T) -> io::Result<Async<T>> {
184 Async::new(io)
185 }
186 }
187}
188
189/// A guard that can kill child processes, or push them into the zombie list.
190struct ChildGuard {
191 inner: reaper::ChildGuard,
192 reap_on_drop: bool,
193 kill_on_drop: bool,
194 reaper: &'static Reaper,
195}
196
197impl ChildGuard {
198 fn get_mut(&mut self) -> &mut std::process::Child {
199 self.inner.get_mut()
200 }
201}
202
203// When the last reference to the child process is dropped, push it into the zombie list.
204impl Drop for ChildGuard {
205 fn drop(&mut self) {
206 if self.kill_on_drop {
207 self.get_mut().kill().ok();
208 }
209 if self.reap_on_drop {
210 self.inner.reap(&self.reaper.sys);
211 }
212
213 // Decrement number of children.
214 self.reaper.child_count.fetch_sub(1, Ordering::Acquire);
215 }
216}
217
218/// A spawned child process.
219///
220/// The process can be in running or exited state. Use [`status()`][`Child::status()`] or
221/// [`output()`][`Child::output()`] to wait for it to exit.
222///
223/// If the [`Child`] is dropped, the process keeps running in the background.
224///
225/// # Examples
226///
227/// Spawn a process and wait for it to complete:
228///
229/// ```no_run
230/// # futures_lite::future::block_on(async {
231/// use async_process::Command;
232///
233/// Command::new("cp").arg("a.txt").arg("b.txt").status().await?;
234/// # std::io::Result::Ok(()) });
235/// ```
236pub struct Child {
237 /// The handle for writing to the child's standard input (stdin), if it has been captured.
238 pub stdin: Option<ChildStdin>,
239
240 /// The handle for reading from the child's standard output (stdout), if it has been captured.
241 pub stdout: Option<ChildStdout>,
242
243 /// The handle for reading from the child's standard error (stderr), if it has been captured.
244 pub stderr: Option<ChildStderr>,
245
246 /// The inner child process handle.
247 child: Arc<Mutex<ChildGuard>>,
248}
249
250impl Child {
251 /// Wraps the inner child process handle and registers it in the global process list.
252 ///
253 /// The "async-process" thread waits for processes in the global list and cleans up the
254 /// resources when they exit.
255 fn new(cmd: &mut Command) -> io::Result<Child> {
256 // Make sure the reaper exists before we spawn the child process.
257 let reaper = Reaper::get();
258 let mut child = cmd.inner.spawn()?;
259
260 // Convert sync I/O types into async I/O types.
261 let stdin = child.stdin.take().map(wrap).transpose()?.map(ChildStdin);
262 let stdout = child.stdout.take().map(wrap).transpose()?.map(ChildStdout);
263 let stderr = child.stderr.take().map(wrap).transpose()?.map(ChildStderr);
264
265 // Bump the child count.
266 reaper.child_count.fetch_add(1, Ordering::Relaxed);
267
268 // Register the child process in the global list.
269 let inner = reaper.register(child)?;
270
271 Ok(Child {
272 stdin,
273 stdout,
274 stderr,
275 child: Arc::new(Mutex::new(ChildGuard {
276 inner,
277 reap_on_drop: cmd.reap_on_drop,
278 kill_on_drop: cmd.kill_on_drop,
279 reaper,
280 })),
281 })
282 }
283
284 /// Returns the OS-assigned process identifier associated with this child.
285 ///
286 /// # Examples
287 ///
288 /// ```no_run
289 /// # futures_lite::future::block_on(async {
290 /// use async_process::Command;
291 ///
292 /// let mut child = Command::new("ls").spawn()?;
293 /// println!("id: {}", child.id());
294 /// # std::io::Result::Ok(()) });
295 /// ```
296 pub fn id(&self) -> u32 {
297 self.child.lock().unwrap().get_mut().id()
298 }
299
300 /// Forces the child process to exit.
301 ///
302 /// If the child has already exited, an [`InvalidInput`] error is returned.
303 ///
304 /// This is equivalent to sending a SIGKILL on Unix platforms.
305 ///
306 /// [`InvalidInput`]: `std::io::ErrorKind::InvalidInput`
307 ///
308 /// # Examples
309 ///
310 /// ```no_run
311 /// # futures_lite::future::block_on(async {
312 /// use async_process::Command;
313 ///
314 /// let mut child = Command::new("yes").spawn()?;
315 /// child.kill()?;
316 /// println!("exit status: {}", child.status().await?);
317 /// # std::io::Result::Ok(()) });
318 /// ```
319 pub fn kill(&mut self) -> io::Result<()> {
320 self.child.lock().unwrap().get_mut().kill()
321 }
322
323 /// Returns the exit status if the process has exited.
324 ///
325 /// Unlike [`status()`][`Child::status()`], this method will not drop the stdin handle.
326 ///
327 /// # Examples
328 ///
329 /// ```no_run
330 /// # futures_lite::future::block_on(async {
331 /// use async_process::Command;
332 ///
333 /// let mut child = Command::new("ls").spawn()?;
334 ///
335 /// match child.try_status()? {
336 /// None => println!("still running"),
337 /// Some(status) => println!("exited with: {}", status),
338 /// }
339 /// # std::io::Result::Ok(()) });
340 /// ```
341 pub fn try_status(&mut self) -> io::Result<Option<ExitStatus>> {
342 self.child.lock().unwrap().get_mut().try_wait()
343 }
344
345 /// Drops the stdin handle and waits for the process to exit.
346 ///
347 /// Closing the stdin of the process helps avoid deadlocks. It ensures that the process does
348 /// not block waiting for input from the parent process while the parent waits for the child to
349 /// exit.
350 ///
351 /// # Examples
352 ///
353 /// ```no_run
354 /// # futures_lite::future::block_on(async {
355 /// use async_process::{Command, Stdio};
356 ///
357 /// let mut child = Command::new("cp")
358 /// .arg("a.txt")
359 /// .arg("b.txt")
360 /// .spawn()?;
361 ///
362 /// println!("exit status: {}", child.status().await?);
363 /// # std::io::Result::Ok(()) });
364 /// ```
365 pub fn status(&mut self) -> impl Future<Output = io::Result<ExitStatus>> {
366 self.stdin.take();
367 let child = self.child.clone();
368
369 async move { Reaper::get().sys.status(&child).await }
370 }
371
372 /// Drops the stdin handle and collects the output of the process.
373 ///
374 /// Closing the stdin of the process helps avoid deadlocks. It ensures that the process does
375 /// not block waiting for input from the parent process while the parent waits for the child to
376 /// exit.
377 ///
378 /// In order to capture the output of the process, [`Command::stdout()`] and
379 /// [`Command::stderr()`] must be configured with [`Stdio::piped()`].
380 ///
381 /// # Examples
382 ///
383 /// ```no_run
384 /// # futures_lite::future::block_on(async {
385 /// use async_process::{Command, Stdio};
386 ///
387 /// let child = Command::new("ls")
388 /// .stdout(Stdio::piped())
389 /// .stderr(Stdio::piped())
390 /// .spawn()?;
391 ///
392 /// let out = child.output().await?;
393 /// # std::io::Result::Ok(()) });
394 /// ```
395 pub fn output(mut self) -> impl Future<Output = io::Result<Output>> {
396 // A future that waits for the exit status.
397 let status = self.status();
398
399 // A future that collects stdout.
400 let stdout = self.stdout.take();
401 let stdout = async move {
402 let mut v = Vec::new();
403 if let Some(mut s) = stdout {
404 s.read_to_end(&mut v).await?;
405 }
406 io::Result::Ok(v)
407 };
408
409 // A future that collects stderr.
410 let stderr = self.stderr.take();
411 let stderr = async move {
412 let mut v = Vec::new();
413 if let Some(mut s) = stderr {
414 s.read_to_end(&mut v).await?;
415 }
416 io::Result::Ok(v)
417 };
418
419 async move {
420 let (stdout, stderr) = future::try_zip(stdout, stderr).await?;
421 let status = status.await?;
422 Ok(Output {
423 status,
424 stdout,
425 stderr,
426 })
427 }
428 }
429}
430
431impl fmt::Debug for Child {
432 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
433 f.debug_struct("Child")
434 .field("stdin", &self.stdin)
435 .field("stdout", &self.stdout)
436 .field("stderr", &self.stderr)
437 .finish()
438 }
439}
440
441/// A handle to a child process's standard input (stdin).
442///
443/// When a [`ChildStdin`] is dropped, the underlying handle gets closed. If the child process was
444/// previously blocked on input, it becomes unblocked after dropping.
445#[derive(Debug)]
446pub struct ChildStdin(
447 #[cfg(windows)] Unblock<std::process::ChildStdin>,
448 #[cfg(unix)] Async<std::process::ChildStdin>,
449);
450
451impl ChildStdin {
452 /// Convert async_process::ChildStdin into std::process::Stdio.
453 ///
454 /// You can use it to associate to the next process.
455 ///
456 /// # Examples
457 ///
458 /// ```no_run
459 /// # futures_lite::future::block_on(async {
460 /// use async_process::Command;
461 /// use std::process::Stdio;
462 ///
463 /// let mut ls_child = Command::new("ls").stdin(Stdio::piped()).spawn()?;
464 /// let stdio:Stdio = ls_child.stdin.take().unwrap().into_stdio().await?;
465 ///
466 /// let mut echo_child = Command::new("echo").arg("./").stdout(stdio).spawn()?;
467 ///
468 /// # std::io::Result::Ok(()) });
469 /// ```
470 pub async fn into_stdio(self) -> io::Result<std::process::Stdio> {
471 cfg_if::cfg_if! {
472 if #[cfg(windows)] {
473 Ok(self.0.into_inner().await.into())
474 } else if #[cfg(unix)] {
475 let child_stdin = self.0.into_inner()?;
476 blocking_fd(rustix::fd::AsFd::as_fd(&child_stdin))?;
477 Ok(child_stdin.into())
478 }
479 }
480 }
481}
482
483impl io::AsyncWrite for ChildStdin {
484 fn poll_write(
485 mut self: Pin<&mut Self>,
486 cx: &mut Context<'_>,
487 buf: &[u8],
488 ) -> Poll<io::Result<usize>> {
489 Pin::new(&mut self.0).poll_write(cx, buf)
490 }
491
492 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
493 Pin::new(&mut self.0).poll_flush(cx)
494 }
495
496 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
497 Pin::new(&mut self.0).poll_close(cx)
498 }
499}
500
501#[cfg(unix)]
502impl AsRawFd for ChildStdin {
503 fn as_raw_fd(&self) -> RawFd {
504 self.0.as_raw_fd()
505 }
506}
507
508#[cfg(unix)]
509impl AsFd for ChildStdin {
510 fn as_fd(&self) -> BorrowedFd<'_> {
511 self.0.as_fd()
512 }
513}
514
515#[cfg(unix)]
516impl TryFrom<ChildStdin> for OwnedFd {
517 type Error = io::Error;
518
519 fn try_from(value: ChildStdin) -> Result<Self, Self::Error> {
520 value.0.try_into()
521 }
522}
523
524// TODO(notgull): Add mirroring AsRawHandle impls for all of the child handles
525//
526// at the moment this is pretty hard to do because of how they're wrapped in
527// Unblock, meaning that we can't always access the underlying handle. async-fs
528// gets around this by putting the handle in an Arc, but there's still some decision
529// to be made about how to handle this (no pun intended)
530
531/// A handle to a child process's standard output (stdout).
532///
533/// When a [`ChildStdout`] is dropped, the underlying handle gets closed.
534#[derive(Debug)]
535pub struct ChildStdout(
536 #[cfg(windows)] Unblock<std::process::ChildStdout>,
537 #[cfg(unix)] Async<std::process::ChildStdout>,
538);
539
540impl ChildStdout {
541 /// Convert async_process::ChildStdout into std::process::Stdio.
542 ///
543 /// You can use it to associate to the next process.
544 ///
545 /// # Examples
546 ///
547 /// ```no_run
548 /// # futures_lite::future::block_on(async {
549 /// use async_process::Command;
550 /// use std::process::Stdio;
551 /// use std::io::Read;
552 /// use futures_lite::AsyncReadExt;
553 ///
554 /// let mut ls_child = Command::new("ls").stdout(Stdio::piped()).spawn()?;
555 /// let stdio:Stdio = ls_child.stdout.take().unwrap().into_stdio().await?;
556 ///
557 /// let mut echo_child = Command::new("echo").stdin(stdio).stdout(Stdio::piped()).spawn()?;
558 /// let mut buf = vec![];
559 /// echo_child.stdout.take().unwrap().read(&mut buf).await;
560 /// # std::io::Result::Ok(()) });
561 /// ```
562 pub async fn into_stdio(self) -> io::Result<std::process::Stdio> {
563 cfg_if::cfg_if! {
564 if #[cfg(windows)] {
565 Ok(self.0.into_inner().await.into())
566 } else if #[cfg(unix)] {
567 let child_stdout = self.0.into_inner()?;
568 blocking_fd(rustix::fd::AsFd::as_fd(&child_stdout))?;
569 Ok(child_stdout.into())
570 }
571 }
572 }
573}
574
575impl io::AsyncRead for ChildStdout {
576 fn poll_read(
577 mut self: Pin<&mut Self>,
578 cx: &mut Context<'_>,
579 buf: &mut [u8],
580 ) -> Poll<io::Result<usize>> {
581 Pin::new(&mut self.0).poll_read(cx, buf)
582 }
583}
584
585#[cfg(unix)]
586impl AsRawFd for ChildStdout {
587 fn as_raw_fd(&self) -> RawFd {
588 self.0.as_raw_fd()
589 }
590}
591
592#[cfg(unix)]
593impl AsFd for ChildStdout {
594 fn as_fd(&self) -> BorrowedFd<'_> {
595 self.0.as_fd()
596 }
597}
598
599#[cfg(unix)]
600impl TryFrom<ChildStdout> for OwnedFd {
601 type Error = io::Error;
602
603 fn try_from(value: ChildStdout) -> Result<Self, Self::Error> {
604 value.0.try_into()
605 }
606}
607
608/// A handle to a child process's standard error (stderr).
609///
610/// When a [`ChildStderr`] is dropped, the underlying handle gets closed.
611#[derive(Debug)]
612pub struct ChildStderr(
613 #[cfg(windows)] Unblock<std::process::ChildStderr>,
614 #[cfg(unix)] Async<std::process::ChildStderr>,
615);
616
617impl ChildStderr {
618 /// Convert async_process::ChildStderr into std::process::Stdio.
619 ///
620 /// You can use it to associate to the next process.
621 ///
622 /// # Examples
623 ///
624 /// ```no_run
625 /// # futures_lite::future::block_on(async {
626 /// use async_process::Command;
627 /// use std::process::Stdio;
628 ///
629 /// let mut ls_child = Command::new("ls").arg("x").stderr(Stdio::piped()).spawn()?;
630 /// let stdio:Stdio = ls_child.stderr.take().unwrap().into_stdio().await?;
631 ///
632 /// let mut echo_child = Command::new("echo").stdin(stdio).spawn()?;
633 /// # std::io::Result::Ok(()) });
634 /// ```
635 pub async fn into_stdio(self) -> io::Result<std::process::Stdio> {
636 cfg_if::cfg_if! {
637 if #[cfg(windows)] {
638 Ok(self.0.into_inner().await.into())
639 } else if #[cfg(unix)] {
640 let child_stderr = self.0.into_inner()?;
641 blocking_fd(rustix::fd::AsFd::as_fd(&child_stderr))?;
642 Ok(child_stderr.into())
643 }
644 }
645 }
646}
647
648impl io::AsyncRead for ChildStderr {
649 fn poll_read(
650 mut self: Pin<&mut Self>,
651 cx: &mut Context<'_>,
652 buf: &mut [u8],
653 ) -> Poll<io::Result<usize>> {
654 Pin::new(&mut self.0).poll_read(cx, buf)
655 }
656}
657
658#[cfg(unix)]
659impl AsRawFd for ChildStderr {
660 fn as_raw_fd(&self) -> RawFd {
661 self.0.as_raw_fd()
662 }
663}
664
665#[cfg(unix)]
666impl AsFd for ChildStderr {
667 fn as_fd(&self) -> BorrowedFd<'_> {
668 self.0.as_fd()
669 }
670}
671
672#[cfg(unix)]
673impl TryFrom<ChildStderr> for OwnedFd {
674 type Error = io::Error;
675
676 fn try_from(value: ChildStderr) -> Result<Self, Self::Error> {
677 value.0.try_into()
678 }
679}
680
681/// Runs the driver for the asynchronous processes.
682///
683/// This future takes control of global structures related to driving [`Child`]ren and reaping
684/// zombie processes. These responsibilities include listening for the `SIGCHLD` signal and
685/// making sure zombie processes are successfully waited on.
686///
687/// If multiple tasks run `driver()` at once, only one will actually drive the reaper; the other
688/// ones will just sleep. If a task that is driving the reaper is dropped, a previously sleeping
689/// task will take over. If all tasks driving the reaper are dropped, the "async-process" thread
690/// will be spawned. The "async-process" thread just blocks on this future and will automatically
691/// be spawned if no tasks are driving the reaper once a [`Child`] is created.
692///
693/// This future will never complete. It is intended to be ran on a background task in your
694/// executor of choice.
695///
696/// # Examples
697///
698/// ```no_run
699/// use async_executor::Executor;
700/// use async_process::{driver, Command};
701///
702/// # futures_lite::future::block_on(async {
703/// // Create an executor and run on it.
704/// let ex = Executor::new();
705/// ex.run(async {
706/// // Run the driver future in the background.
707/// ex.spawn(driver()).detach();
708///
709/// // Run a command.
710/// Command::new("ls").output().await.ok();
711/// }).await;
712/// # });
713/// ```
714#[allow(clippy::manual_async_fn)]
715#[inline]
716pub fn driver() -> impl Future<Output = Infallible> + Send + 'static {
717 async {
718 // Get the reaper.
719 let reaper = Reaper::get();
720
721 // Make sure the reaper knows we're driving it.
722 reaper.drivers.fetch_add(1, Ordering::SeqCst);
723
724 // Decrement the driver count when this future is dropped.
725 let _guard = CallOnDrop(|| {
726 let prev_count = reaper.drivers.fetch_sub(1, Ordering::SeqCst);
727
728 // If this was the last driver, and there are still resources actively using the
729 // reaper, make sure that there is a thread driving the reaper.
730 if prev_count == 1
731 && (reaper.child_count.load(Ordering::SeqCst) > 0 || reaper.sys.has_zombies())
732 {
733 reaper.ensure_driven();
734 }
735 });
736
737 // Acquire the reaper lock and start polling the SIGCHLD event.
738 let guard = reaper.sys.lock().await;
739 reaper.sys.reap(guard).await
740 }
741}
742
743/// A builder for spawning processes.
744///
745/// # Examples
746///
747/// ```no_run
748/// # futures_lite::future::block_on(async {
749/// use async_process::Command;
750///
751/// let output = if cfg!(target_os = "windows") {
752/// Command::new("cmd").args(&["/C", "echo hello"]).output().await?
753/// } else {
754/// Command::new("sh").arg("-c").arg("echo hello").output().await?
755/// };
756/// # std::io::Result::Ok(()) });
757/// ```
758pub struct Command {
759 inner: std::process::Command,
760 stdin: bool,
761 stdout: bool,
762 stderr: bool,
763 reap_on_drop: bool,
764 kill_on_drop: bool,
765}
766
767impl Command {
768 /// Constructs a new [`Command`] for launching `program`.
769 ///
770 /// The initial configuration (the working directory and environment variables) is inherited
771 /// from the current process.
772 ///
773 /// # Examples
774 ///
775 /// ```
776 /// use async_process::Command;
777 ///
778 /// let mut cmd = Command::new("ls");
779 /// ```
780 pub fn new<S: AsRef<OsStr>>(program: S) -> Command {
781 Self::from(std::process::Command::new(program))
782 }
783
784 /// Adds a single argument to pass to the program.
785 ///
786 /// # Examples
787 ///
788 /// ```
789 /// use async_process::Command;
790 ///
791 /// let mut cmd = Command::new("echo");
792 /// cmd.arg("hello");
793 /// cmd.arg("world");
794 /// ```
795 pub fn arg<S: AsRef<OsStr>>(&mut self, arg: S) -> &mut Command {
796 self.inner.arg(arg);
797 self
798 }
799
800 /// Adds multiple arguments to pass to the program.
801 ///
802 /// # Examples
803 ///
804 /// ```
805 /// use async_process::Command;
806 ///
807 /// let mut cmd = Command::new("echo");
808 /// cmd.args(&["hello", "world"]);
809 /// ```
810 pub fn args<I, S>(&mut self, args: I) -> &mut Command
811 where
812 I: IntoIterator<Item = S>,
813 S: AsRef<OsStr>,
814 {
815 self.inner.args(args);
816 self
817 }
818
819 /// Configures an environment variable for the new process.
820 ///
821 /// Note that environment variable names are case-insensitive (but case-preserving) on Windows,
822 /// and case-sensitive on all other platforms.
823 ///
824 /// # Examples
825 ///
826 /// ```
827 /// use async_process::Command;
828 ///
829 /// let mut cmd = Command::new("ls");
830 /// cmd.env("PATH", "/bin");
831 /// ```
832 pub fn env<K, V>(&mut self, key: K, val: V) -> &mut Command
833 where
834 K: AsRef<OsStr>,
835 V: AsRef<OsStr>,
836 {
837 self.inner.env(key, val);
838 self
839 }
840
841 /// Configures multiple environment variables for the new process.
842 ///
843 /// Note that environment variable names are case-insensitive (but case-preserving) on Windows,
844 /// and case-sensitive on all other platforms.
845 ///
846 /// # Examples
847 ///
848 /// ```
849 /// use async_process::Command;
850 ///
851 /// let mut cmd = Command::new("ls");
852 /// cmd.envs(vec![("PATH", "/bin"), ("TERM", "xterm-256color")]);
853 /// ```
854 pub fn envs<I, K, V>(&mut self, vars: I) -> &mut Command
855 where
856 I: IntoIterator<Item = (K, V)>,
857 K: AsRef<OsStr>,
858 V: AsRef<OsStr>,
859 {
860 self.inner.envs(vars);
861 self
862 }
863
864 /// Gets an iterator of the arguments that will be passed to the program.
865 ///
866 /// # Examples
867 ///
868 /// ```
869 /// use std::ffi::OsStr;
870 /// use async_process::Command;
871 ///
872 /// let mut cmd = Command::new("echo");
873 /// cmd.arg("first").arg("second");
874 /// let args: Vec<&OsStr> = cmd.get_args().collect();
875 /// assert_eq!(args, &["first", "second"]);
876 /// ```
877 pub fn get_args(&self) -> std::process::CommandArgs<'_> {
878 self.inner.get_args()
879 }
880
881 /// Gets an iterator of the environment variables explicitly set for the child process.
882 ///
883 /// # Examples
884 ///
885 /// ```
886 /// use std::ffi::OsStr;
887 /// use async_process::Command;
888 ///
889 /// let mut cmd = Command::new("ls");
890 /// cmd.env("TERM", "dumb").env_remove("TZ");
891 /// let envs: Vec<(&OsStr, Option<&OsStr>)> = cmd.get_envs().collect();
892 /// assert_eq!(envs, &[
893 /// (OsStr::new("TERM"), Some(OsStr::new("dumb"))),
894 /// (OsStr::new("TZ"), None)
895 /// ]);
896 /// ```
897 pub fn get_envs(&self) -> std::process::CommandEnvs<'_> {
898 self.inner.get_envs()
899 }
900
901 /// Gets the working directory for the child process.
902 ///
903 /// This returns [`None`] if the working directory will not be changed.
904 ///
905 /// # Examples
906 ///
907 /// ```
908 /// use std::path::Path;
909 /// use async_process::Command;
910 ///
911 /// let mut cmd = Command::new("ls");
912 /// assert_eq!(cmd.get_current_dir(), None);
913 /// cmd.current_dir("/bin");
914 /// assert_eq!(cmd.get_current_dir(), Some(Path::new("/bin")));
915 /// ```
916 pub fn get_current_dir(&self) -> Option<&Path> {
917 self.inner.get_current_dir()
918 }
919
920 /// Gets the path to the program that was given to [`Command::new`].
921 ///
922 /// # Examples
923 ///
924 /// ```
925 /// use async_process::Command;
926 ///
927 /// let cmd = Command::new("echo");
928 /// assert_eq!(cmd.get_program(), "echo");
929 /// ```
930 pub fn get_program(&self) -> &OsStr {
931 self.inner.get_program()
932 }
933
934 /// Removes an environment variable mapping.
935 ///
936 /// # Examples
937 ///
938 /// ```
939 /// use async_process::Command;
940 ///
941 /// let mut cmd = Command::new("ls");
942 /// cmd.env_remove("PATH");
943 /// ```
944 pub fn env_remove<K: AsRef<OsStr>>(&mut self, key: K) -> &mut Command {
945 self.inner.env_remove(key);
946 self
947 }
948
949 /// Removes all environment variable mappings.
950 ///
951 /// # Examples
952 ///
953 /// ```
954 /// use async_process::Command;
955 ///
956 /// let mut cmd = Command::new("ls");
957 /// cmd.env_clear();
958 /// ```
959 pub fn env_clear(&mut self) -> &mut Command {
960 self.inner.env_clear();
961 self
962 }
963
964 /// Configures the working directory for the new process.
965 ///
966 /// # Examples
967 ///
968 /// ```
969 /// use async_process::Command;
970 ///
971 /// let mut cmd = Command::new("ls");
972 /// cmd.current_dir("/");
973 /// ```
974 pub fn current_dir<P: AsRef<Path>>(&mut self, dir: P) -> &mut Command {
975 self.inner.current_dir(dir);
976 self
977 }
978
979 /// Configures the standard input (stdin) for the new process.
980 ///
981 /// # Examples
982 ///
983 /// ```
984 /// use async_process::{Command, Stdio};
985 ///
986 /// let mut cmd = Command::new("cat");
987 /// cmd.stdin(Stdio::null());
988 /// ```
989 pub fn stdin<T: Into<Stdio>>(&mut self, cfg: T) -> &mut Command {
990 self.stdin = true;
991 self.inner.stdin(cfg);
992 self
993 }
994
995 /// Configures the standard output (stdout) for the new process.
996 ///
997 /// # Examples
998 ///
999 /// ```
1000 /// use async_process::{Command, Stdio};
1001 ///
1002 /// let mut cmd = Command::new("ls");
1003 /// cmd.stdout(Stdio::piped());
1004 /// ```
1005 pub fn stdout<T: Into<Stdio>>(&mut self, cfg: T) -> &mut Command {
1006 self.stdout = true;
1007 self.inner.stdout(cfg);
1008 self
1009 }
1010
1011 /// Configures the standard error (stderr) for the new process.
1012 ///
1013 /// # Examples
1014 ///
1015 /// ```
1016 /// use async_process::{Command, Stdio};
1017 ///
1018 /// let mut cmd = Command::new("ls");
1019 /// cmd.stderr(Stdio::piped());
1020 /// ```
1021 pub fn stderr<T: Into<Stdio>>(&mut self, cfg: T) -> &mut Command {
1022 self.stderr = true;
1023 self.inner.stderr(cfg);
1024 self
1025 }
1026
1027 /// Configures whether to reap the zombie process when [`Child`] is dropped.
1028 ///
1029 /// When the process finishes, it becomes a "zombie" and some resources associated with it
1030 /// remain until [`Child::try_status()`], [`Child::status()`], or [`Child::output()`] collects
1031 /// its exit code.
1032 ///
1033 /// If its exit code is never collected, the resources may leak forever. This crate has a
1034 /// background thread named "async-process" that collects such "zombie" processes and then
1035 /// "reaps" them, thus preventing the resource leaks.
1036 ///
1037 /// The default value of this option is `true`.
1038 ///
1039 /// # Examples
1040 ///
1041 /// ```
1042 /// use async_process::{Command, Stdio};
1043 ///
1044 /// let mut cmd = Command::new("cat");
1045 /// cmd.reap_on_drop(false);
1046 /// ```
1047 pub fn reap_on_drop(&mut self, reap_on_drop: bool) -> &mut Command {
1048 self.reap_on_drop = reap_on_drop;
1049 self
1050 }
1051
1052 /// Configures whether to kill the process when [`Child`] is dropped.
1053 ///
1054 /// The default value of this option is `false`.
1055 ///
1056 /// # Examples
1057 ///
1058 /// ```
1059 /// use async_process::{Command, Stdio};
1060 ///
1061 /// let mut cmd = Command::new("cat");
1062 /// cmd.kill_on_drop(true);
1063 /// ```
1064 pub fn kill_on_drop(&mut self, kill_on_drop: bool) -> &mut Command {
1065 self.kill_on_drop = kill_on_drop;
1066 self
1067 }
1068
1069 /// Executes the command and returns the [`Child`] handle to it.
1070 ///
1071 /// If not configured, stdin, stdout and stderr will be set to [`Stdio::inherit()`].
1072 ///
1073 /// # Examples
1074 ///
1075 /// ```no_run
1076 /// # futures_lite::future::block_on(async {
1077 /// use async_process::Command;
1078 ///
1079 /// let child = Command::new("ls").spawn()?;
1080 /// # std::io::Result::Ok(()) });
1081 /// ```
1082 pub fn spawn(&mut self) -> io::Result<Child> {
1083 if !self.stdin {
1084 self.inner.stdin(Stdio::inherit());
1085 }
1086 if !self.stdout {
1087 self.inner.stdout(Stdio::inherit());
1088 }
1089 if !self.stderr {
1090 self.inner.stderr(Stdio::inherit());
1091 }
1092
1093 Child::new(self)
1094 }
1095
1096 /// Executes the command, waits for it to exit, and returns the exit status.
1097 ///
1098 /// If not configured, stdin, stdout and stderr will be set to [`Stdio::inherit()`].
1099 ///
1100 /// # Examples
1101 ///
1102 /// ```no_run
1103 /// # futures_lite::future::block_on(async {
1104 /// use async_process::Command;
1105 ///
1106 /// let status = Command::new("cp")
1107 /// .arg("a.txt")
1108 /// .arg("b.txt")
1109 /// .status()
1110 /// .await?;
1111 /// # std::io::Result::Ok(()) });
1112 /// ```
1113 pub fn status(&mut self) -> impl Future<Output = io::Result<ExitStatus>> {
1114 let child = self.spawn();
1115 async { child?.status().await }
1116 }
1117
1118 /// Executes the command and collects its output.
1119 ///
1120 /// If not configured, stdin will be set to [`Stdio::null()`], and stdout and stderr will be
1121 /// set to [`Stdio::piped()`].
1122 ///
1123 /// # Examples
1124 ///
1125 /// ```no_run
1126 /// # futures_lite::future::block_on(async {
1127 /// use async_process::Command;
1128 ///
1129 /// let output = Command::new("cat")
1130 /// .arg("a.txt")
1131 /// .output()
1132 /// .await?;
1133 /// # std::io::Result::Ok(()) });
1134 /// ```
1135 pub fn output(&mut self) -> impl Future<Output = io::Result<Output>> {
1136 if !self.stdin {
1137 self.inner.stdin(Stdio::null());
1138 }
1139 if !self.stdout {
1140 self.inner.stdout(Stdio::piped());
1141 }
1142 if !self.stderr {
1143 self.inner.stderr(Stdio::piped());
1144 }
1145
1146 let child = Child::new(self);
1147 async { child?.output().await }
1148 }
1149}
1150
1151impl From<std::process::Command> for Command {
1152 fn from(inner: std::process::Command) -> Self {
1153 Self {
1154 inner,
1155 stdin: false,
1156 stdout: false,
1157 stderr: false,
1158 reap_on_drop: true,
1159 kill_on_drop: false,
1160 }
1161 }
1162}
1163
1164impl fmt::Debug for Command {
1165 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1166 if f.alternate() {
1167 f.debug_struct("Command")
1168 .field("inner", &self.inner)
1169 .field("stdin", &self.stdin)
1170 .field("stdout", &self.stdout)
1171 .field("stderr", &self.stderr)
1172 .field("reap_on_drop", &self.reap_on_drop)
1173 .field("kill_on_drop", &self.kill_on_drop)
1174 .finish()
1175 } else {
1176 // Stdlib outputs command-line in Debug for Command. This does the
1177 // same, if not in "alternate" (long pretty-printed) mode.
1178 // This is useful for logs, for example.
1179 fmt::Debug::fmt(&self.inner, f)
1180 }
1181 }
1182}
1183
1184/// Moves `Fd` out of non-blocking mode.
1185#[cfg(unix)]
1186fn blocking_fd(fd: rustix::fd::BorrowedFd<'_>) -> io::Result<()> {
1187 cfg_if::cfg_if! {
1188 // ioctl(FIONBIO) sets the flag atomically, but we use this only on Linux
1189 // for now, as with the standard library, because it seems to behave
1190 // differently depending on the platform.
1191 // https://github.com/rust-lang/rust/commit/efeb42be2837842d1beb47b51bb693c7474aba3d
1192 // https://github.com/libuv/libuv/blob/e9d91fccfc3e5ff772d5da90e1c4a24061198ca0/src/unix/poll.c#L78-L80
1193 // https://github.com/tokio-rs/mio/commit/0db49f6d5caf54b12176821363d154384357e70a
1194 if #[cfg(target_os = "linux")] {
1195 rustix::io::ioctl_fionbio(fd, false)?;
1196 } else {
1197 let previous = rustix::fs::fcntl_getfl(fd)?;
1198 let new = previous & !rustix::fs::OFlags::NONBLOCK;
1199 if new != previous {
1200 rustix::fs::fcntl_setfl(fd, new)?;
1201 }
1202 }
1203 }
1204 Ok(())
1205}
1206
1207struct CallOnDrop<F: FnMut()>(F);
1208
1209impl<F: FnMut()> Drop for CallOnDrop<F> {
1210 fn drop(&mut self) {
1211 (self.0)();
1212 }
1213}
1214
1215#[cfg(test)]
1216mod test {
1217 #[test]
1218 fn polled_driver() {
1219 use super::{driver, Command};
1220 use futures_lite::future;
1221 use futures_lite::prelude::*;
1222
1223 let is_thread_spawned =
1224 || super::DRIVER_THREAD_SPAWNED.load(std::sync::atomic::Ordering::SeqCst);
1225
1226 #[cfg(unix)]
1227 fn command() -> Command {
1228 let mut cmd = Command::new("sh");
1229 cmd.arg("-c").arg("echo hello");
1230 cmd
1231 }
1232
1233 #[cfg(windows)]
1234 fn command() -> Command {
1235 let mut cmd = Command::new("cmd");
1236 cmd.arg("/C").arg("echo hello");
1237 cmd
1238 }
1239
1240 #[cfg(unix)]
1241 const OUTPUT: &[u8] = b"hello\n";
1242 #[cfg(windows)]
1243 const OUTPUT: &[u8] = b"hello\r\n";
1244
1245 future::block_on(async {
1246 // Thread should not be spawned off the bat.
1247 assert!(!is_thread_spawned());
1248
1249 // Spawn a driver.
1250 let mut driver1 = Box::pin(driver());
1251 future::poll_once(&mut driver1).await;
1252 assert!(!is_thread_spawned());
1253
1254 // We should be able to run the driver in parallel with a process future.
1255 async {
1256 (&mut driver1).await;
1257 }
1258 .or(async {
1259 let output = command().output().await.unwrap();
1260 assert_eq!(output.stdout, OUTPUT);
1261 })
1262 .await;
1263 assert!(!is_thread_spawned());
1264
1265 // Spawn a second driver.
1266 let mut driver2 = Box::pin(driver());
1267 future::poll_once(&mut driver2).await;
1268 assert!(!is_thread_spawned());
1269
1270 // Poll both drivers in parallel.
1271 async {
1272 (&mut driver1).await;
1273 }
1274 .or(async {
1275 (&mut driver2).await;
1276 })
1277 .or(async {
1278 let output = command().output().await.unwrap();
1279 assert_eq!(output.stdout, OUTPUT);
1280 })
1281 .await;
1282 assert!(!is_thread_spawned());
1283
1284 // Once one is dropped, the other should take over.
1285 drop(driver1);
1286 assert!(!is_thread_spawned());
1287
1288 // Poll driver2 in parallel with a process future.
1289 async {
1290 (&mut driver2).await;
1291 }
1292 .or(async {
1293 let output = command().output().await.unwrap();
1294 assert_eq!(output.stdout, OUTPUT);
1295 })
1296 .await;
1297 assert!(!is_thread_spawned());
1298
1299 // Once driver2 is dropped, the thread should not be spawned, as there are no active
1300 // child processes..
1301 drop(driver2);
1302 assert!(!is_thread_spawned());
1303
1304 // We should now be able to poll the process future independently, it will spawn the
1305 // thread.
1306 let output = command().output().await.unwrap();
1307 assert_eq!(output.stdout, OUTPUT);
1308 assert!(is_thread_spawned());
1309 });
1310 }
1311}