async_process/
lib.rs

1//! Async interface for working with processes.
2//!
3//! This crate is an async version of [`std::process`].
4//!
5//! # Implementation
6//!
7//! A background thread named "async-process" is lazily created on first use, which waits for
8//! spawned child processes to exit and then calls the `wait()` syscall to clean up the "zombie"
9//! processes. This is unlike the `process` API in the standard library, where dropping a running
10//! `Child` leaks its resources.
11//!
12//! This crate uses [`async-io`] for async I/O on Unix-like systems and [`blocking`] for async I/O
13//! on Windows.
14//!
15//! [`async-io`]: https://docs.rs/async-io
16//! [`blocking`]: https://docs.rs/blocking
17//!
18//! # Examples
19//!
20//! Spawn a process and collect its output:
21//!
22//! ```no_run
23//! # futures_lite::future::block_on(async {
24//! use async_process::Command;
25//!
26//! let out = Command::new("echo").arg("hello").arg("world").output().await?;
27//! assert_eq!(out.stdout, b"hello world\n");
28//! # std::io::Result::Ok(()) });
29//! ```
30//!
31//! Read the output line-by-line as it gets produced:
32//!
33//! ```no_run
34//! # futures_lite::future::block_on(async {
35//! use async_process::{Command, Stdio};
36//! use futures_lite::{io::BufReader, prelude::*};
37//!
38//! let mut child = Command::new("find")
39//!     .arg(".")
40//!     .stdout(Stdio::piped())
41//!     .spawn()?;
42//!
43//! let mut lines = BufReader::new(child.stdout.take().unwrap()).lines();
44//!
45//! while let Some(line) = lines.next().await {
46//!     println!("{}", line?);
47//! }
48//! # std::io::Result::Ok(()) });
49//! ```
50
51#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
52#![doc(
53    html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
54)]
55#![doc(
56    html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
57)]
58
59use std::convert::Infallible;
60use std::ffi::OsStr;
61use std::fmt;
62use std::path::Path;
63use std::pin::Pin;
64use std::sync::atomic::{AtomicUsize, Ordering};
65use std::sync::{Arc, Mutex, OnceLock};
66use std::task::{Context, Poll};
67use std::thread;
68
69#[cfg(unix)]
70use async_io::Async;
71#[cfg(unix)]
72use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
73
74#[cfg(windows)]
75use blocking::Unblock;
76
77use futures_lite::{future, io, prelude::*};
78
79#[doc(no_inline)]
80pub use std::process::{ExitStatus, Output, Stdio};
81
82#[cfg(unix)]
83pub mod unix;
84#[cfg(windows)]
85pub mod windows;
86
87mod reaper;
88
89mod sealed {
90    pub trait Sealed {}
91}
92
93#[cfg(test)]
94static DRIVER_THREAD_SPAWNED: std::sync::atomic::AtomicBool =
95    std::sync::atomic::AtomicBool::new(false);
96
97/// The zombie process reaper.
98///
99/// This structure reaps zombie processes and emits the `SIGCHLD` signal.
100struct Reaper {
101    /// Underlying system reaper.
102    sys: reaper::Reaper,
103
104    /// The number of tasks polling the SIGCHLD event.
105    ///
106    /// If this is zero, the `async-process` thread must be spawned.
107    drivers: AtomicUsize,
108
109    /// Number of live `Child` instances currently running.
110    ///
111    /// This is used to prevent the reaper thread from being spawned right as the program closes,
112    /// when the reaper thread isn't needed. This represents the number of active processes.
113    child_count: AtomicUsize,
114}
115
116impl Reaper {
117    /// Get the singleton instance of the reaper.
118    fn get() -> &'static Self {
119        static REAPER: OnceLock<Reaper> = OnceLock::new();
120
121        REAPER.get_or_init(|| Reaper {
122            sys: reaper::Reaper::new(),
123            drivers: AtomicUsize::new(0),
124            child_count: AtomicUsize::new(0),
125        })
126    }
127
128    /// Ensure that the reaper is driven.
129    ///
130    /// If there are no active `driver()` callers, this will spawn the `async-process` thread.
131    #[inline]
132    fn ensure_driven(&'static self) {
133        if self
134            .drivers
135            .compare_exchange(0, 1, Ordering::SeqCst, Ordering::Acquire)
136            .is_ok()
137        {
138            self.start_driver_thread();
139        }
140    }
141
142    /// Start the `async-process` thread.
143    #[cold]
144    fn start_driver_thread(&'static self) {
145        #[cfg(test)]
146        DRIVER_THREAD_SPAWNED
147            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
148            .unwrap_or_else(|_| unreachable!("Driver thread already spawned"));
149
150        thread::Builder::new()
151            .name("async-process".to_string())
152            .spawn(move || {
153                let driver = async move {
154                    // No need to bump self.drivers, it was already bumped in ensure_driven.
155                    let guard = self.sys.lock().await;
156                    self.sys.reap(guard).await
157                };
158
159                #[cfg(unix)]
160                async_io::block_on(driver);
161
162                #[cfg(not(unix))]
163                future::block_on(driver);
164            })
165            .expect("cannot spawn async-process thread");
166    }
167
168    /// Register a process with this reaper.
169    fn register(&'static self, child: std::process::Child) -> io::Result<reaper::ChildGuard> {
170        self.ensure_driven();
171        self.sys.register(child)
172    }
173}
174
175cfg_if::cfg_if! {
176    if #[cfg(windows)] {
177        // Wraps a sync I/O type into an async I/O type.
178        fn wrap<T>(io: T) -> io::Result<Unblock<T>> {
179            Ok(Unblock::new(io))
180        }
181    } else if #[cfg(unix)] {
182        /// Wrap a file descriptor into a non-blocking I/O type.
183        fn wrap<T: std::os::unix::io::AsFd>(io: T) -> io::Result<Async<T>> {
184            Async::new(io)
185        }
186    }
187}
188
189/// A guard that can kill child processes, or push them into the zombie list.
190struct ChildGuard {
191    inner: reaper::ChildGuard,
192    reap_on_drop: bool,
193    kill_on_drop: bool,
194    reaper: &'static Reaper,
195}
196
197impl ChildGuard {
198    fn get_mut(&mut self) -> &mut std::process::Child {
199        self.inner.get_mut()
200    }
201}
202
203// When the last reference to the child process is dropped, push it into the zombie list.
204impl Drop for ChildGuard {
205    fn drop(&mut self) {
206        if self.kill_on_drop {
207            self.get_mut().kill().ok();
208        }
209        if self.reap_on_drop {
210            self.inner.reap(&self.reaper.sys);
211        }
212
213        // Decrement number of children.
214        self.reaper.child_count.fetch_sub(1, Ordering::Acquire);
215    }
216}
217
218/// A spawned child process.
219///
220/// The process can be in running or exited state. Use [`status()`][`Child::status()`] or
221/// [`output()`][`Child::output()`] to wait for it to exit.
222///
223/// If the [`Child`] is dropped, the process keeps running in the background.
224///
225/// # Examples
226///
227/// Spawn a process and wait for it to complete:
228///
229/// ```no_run
230/// # futures_lite::future::block_on(async {
231/// use async_process::Command;
232///
233/// Command::new("cp").arg("a.txt").arg("b.txt").status().await?;
234/// # std::io::Result::Ok(()) });
235/// ```
236pub struct Child {
237    /// The handle for writing to the child's standard input (stdin), if it has been captured.
238    pub stdin: Option<ChildStdin>,
239
240    /// The handle for reading from the child's standard output (stdout), if it has been captured.
241    pub stdout: Option<ChildStdout>,
242
243    /// The handle for reading from the child's standard error (stderr), if it has been captured.
244    pub stderr: Option<ChildStderr>,
245
246    /// The inner child process handle.
247    child: Arc<Mutex<ChildGuard>>,
248}
249
250impl Child {
251    /// Wraps the inner child process handle and registers it in the global process list.
252    ///
253    /// The "async-process" thread waits for processes in the global list and cleans up the
254    /// resources when they exit.
255    fn new(cmd: &mut Command) -> io::Result<Child> {
256        // Make sure the reaper exists before we spawn the child process.
257        let reaper = Reaper::get();
258        let mut child = cmd.inner.spawn()?;
259
260        // Convert sync I/O types into async I/O types.
261        let stdin = child.stdin.take().map(wrap).transpose()?.map(ChildStdin);
262        let stdout = child.stdout.take().map(wrap).transpose()?.map(ChildStdout);
263        let stderr = child.stderr.take().map(wrap).transpose()?.map(ChildStderr);
264
265        // Bump the child count.
266        reaper.child_count.fetch_add(1, Ordering::Relaxed);
267
268        // Register the child process in the global list.
269        let inner = reaper.register(child)?;
270
271        Ok(Child {
272            stdin,
273            stdout,
274            stderr,
275            child: Arc::new(Mutex::new(ChildGuard {
276                inner,
277                reap_on_drop: cmd.reap_on_drop,
278                kill_on_drop: cmd.kill_on_drop,
279                reaper,
280            })),
281        })
282    }
283
284    /// Returns the OS-assigned process identifier associated with this child.
285    ///
286    /// # Examples
287    ///
288    /// ```no_run
289    /// # futures_lite::future::block_on(async {
290    /// use async_process::Command;
291    ///
292    /// let mut child = Command::new("ls").spawn()?;
293    /// println!("id: {}", child.id());
294    /// # std::io::Result::Ok(()) });
295    /// ```
296    pub fn id(&self) -> u32 {
297        self.child.lock().unwrap().get_mut().id()
298    }
299
300    /// Forces the child process to exit.
301    ///
302    /// If the child has already exited, an [`InvalidInput`] error is returned.
303    ///
304    /// This is equivalent to sending a SIGKILL on Unix platforms.
305    ///
306    /// [`InvalidInput`]: `std::io::ErrorKind::InvalidInput`
307    ///
308    /// # Examples
309    ///
310    /// ```no_run
311    /// # futures_lite::future::block_on(async {
312    /// use async_process::Command;
313    ///
314    /// let mut child = Command::new("yes").spawn()?;
315    /// child.kill()?;
316    /// println!("exit status: {}", child.status().await?);
317    /// # std::io::Result::Ok(()) });
318    /// ```
319    pub fn kill(&mut self) -> io::Result<()> {
320        self.child.lock().unwrap().get_mut().kill()
321    }
322
323    /// Returns the exit status if the process has exited.
324    ///
325    /// Unlike [`status()`][`Child::status()`], this method will not drop the stdin handle.
326    ///
327    /// # Examples
328    ///
329    /// ```no_run
330    /// # futures_lite::future::block_on(async {
331    /// use async_process::Command;
332    ///
333    /// let mut child = Command::new("ls").spawn()?;
334    ///
335    /// match child.try_status()? {
336    ///     None => println!("still running"),
337    ///     Some(status) => println!("exited with: {}", status),
338    /// }
339    /// # std::io::Result::Ok(()) });
340    /// ```
341    pub fn try_status(&mut self) -> io::Result<Option<ExitStatus>> {
342        self.child.lock().unwrap().get_mut().try_wait()
343    }
344
345    /// Drops the stdin handle and waits for the process to exit.
346    ///
347    /// Closing the stdin of the process helps avoid deadlocks. It ensures that the process does
348    /// not block waiting for input from the parent process while the parent waits for the child to
349    /// exit.
350    ///
351    /// # Examples
352    ///
353    /// ```no_run
354    /// # futures_lite::future::block_on(async {
355    /// use async_process::{Command, Stdio};
356    ///
357    /// let mut child = Command::new("cp")
358    ///     .arg("a.txt")
359    ///     .arg("b.txt")
360    ///     .spawn()?;
361    ///
362    /// println!("exit status: {}", child.status().await?);
363    /// # std::io::Result::Ok(()) });
364    /// ```
365    pub fn status(&mut self) -> impl Future<Output = io::Result<ExitStatus>> {
366        self.stdin.take();
367        let child = self.child.clone();
368
369        async move { Reaper::get().sys.status(&child).await }
370    }
371
372    /// Drops the stdin handle and collects the output of the process.
373    ///
374    /// Closing the stdin of the process helps avoid deadlocks. It ensures that the process does
375    /// not block waiting for input from the parent process while the parent waits for the child to
376    /// exit.
377    ///
378    /// In order to capture the output of the process, [`Command::stdout()`] and
379    /// [`Command::stderr()`] must be configured with [`Stdio::piped()`].
380    ///
381    /// # Examples
382    ///
383    /// ```no_run
384    /// # futures_lite::future::block_on(async {
385    /// use async_process::{Command, Stdio};
386    ///
387    /// let child = Command::new("ls")
388    ///     .stdout(Stdio::piped())
389    ///     .stderr(Stdio::piped())
390    ///     .spawn()?;
391    ///
392    /// let out = child.output().await?;
393    /// # std::io::Result::Ok(()) });
394    /// ```
395    pub fn output(mut self) -> impl Future<Output = io::Result<Output>> {
396        // A future that waits for the exit status.
397        let status = self.status();
398
399        // A future that collects stdout.
400        let stdout = self.stdout.take();
401        let stdout = async move {
402            let mut v = Vec::new();
403            if let Some(mut s) = stdout {
404                s.read_to_end(&mut v).await?;
405            }
406            io::Result::Ok(v)
407        };
408
409        // A future that collects stderr.
410        let stderr = self.stderr.take();
411        let stderr = async move {
412            let mut v = Vec::new();
413            if let Some(mut s) = stderr {
414                s.read_to_end(&mut v).await?;
415            }
416            io::Result::Ok(v)
417        };
418
419        async move {
420            let (stdout, stderr) = future::try_zip(stdout, stderr).await?;
421            let status = status.await?;
422            Ok(Output {
423                status,
424                stdout,
425                stderr,
426            })
427        }
428    }
429}
430
431impl fmt::Debug for Child {
432    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
433        f.debug_struct("Child")
434            .field("stdin", &self.stdin)
435            .field("stdout", &self.stdout)
436            .field("stderr", &self.stderr)
437            .finish()
438    }
439}
440
441/// A handle to a child process's standard input (stdin).
442///
443/// When a [`ChildStdin`] is dropped, the underlying handle gets closed. If the child process was
444/// previously blocked on input, it becomes unblocked after dropping.
445#[derive(Debug)]
446pub struct ChildStdin(
447    #[cfg(windows)] Unblock<std::process::ChildStdin>,
448    #[cfg(unix)] Async<std::process::ChildStdin>,
449);
450
451impl ChildStdin {
452    /// Convert async_process::ChildStdin into std::process::Stdio.
453    ///
454    /// You can use it to associate to the next process.
455    ///
456    /// # Examples
457    ///
458    /// ```no_run
459    /// # futures_lite::future::block_on(async {
460    /// use async_process::Command;
461    /// use std::process::Stdio;
462    ///
463    /// let mut ls_child = Command::new("ls").stdin(Stdio::piped()).spawn()?;
464    /// let stdio:Stdio = ls_child.stdin.take().unwrap().into_stdio().await?;
465    ///
466    /// let mut echo_child = Command::new("echo").arg("./").stdout(stdio).spawn()?;
467    ///
468    /// # std::io::Result::Ok(()) });
469    /// ```
470    pub async fn into_stdio(self) -> io::Result<std::process::Stdio> {
471        cfg_if::cfg_if! {
472            if #[cfg(windows)] {
473                Ok(self.0.into_inner().await.into())
474            } else if #[cfg(unix)] {
475                let child_stdin = self.0.into_inner()?;
476                blocking_fd(rustix::fd::AsFd::as_fd(&child_stdin))?;
477                Ok(child_stdin.into())
478            }
479        }
480    }
481}
482
483impl io::AsyncWrite for ChildStdin {
484    fn poll_write(
485        mut self: Pin<&mut Self>,
486        cx: &mut Context<'_>,
487        buf: &[u8],
488    ) -> Poll<io::Result<usize>> {
489        Pin::new(&mut self.0).poll_write(cx, buf)
490    }
491
492    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
493        Pin::new(&mut self.0).poll_flush(cx)
494    }
495
496    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
497        Pin::new(&mut self.0).poll_close(cx)
498    }
499}
500
501#[cfg(unix)]
502impl AsRawFd for ChildStdin {
503    fn as_raw_fd(&self) -> RawFd {
504        self.0.as_raw_fd()
505    }
506}
507
508#[cfg(unix)]
509impl AsFd for ChildStdin {
510    fn as_fd(&self) -> BorrowedFd<'_> {
511        self.0.as_fd()
512    }
513}
514
515#[cfg(unix)]
516impl TryFrom<ChildStdin> for OwnedFd {
517    type Error = io::Error;
518
519    fn try_from(value: ChildStdin) -> Result<Self, Self::Error> {
520        value.0.try_into()
521    }
522}
523
524// TODO(notgull): Add mirroring AsRawHandle impls for all of the child handles
525//
526// at the moment this is pretty hard to do because of how they're wrapped in
527// Unblock, meaning that we can't always access the underlying handle. async-fs
528// gets around this by putting the handle in an Arc, but there's still some decision
529// to be made about how to handle this (no pun intended)
530
531/// A handle to a child process's standard output (stdout).
532///
533/// When a [`ChildStdout`] is dropped, the underlying handle gets closed.
534#[derive(Debug)]
535pub struct ChildStdout(
536    #[cfg(windows)] Unblock<std::process::ChildStdout>,
537    #[cfg(unix)] Async<std::process::ChildStdout>,
538);
539
540impl ChildStdout {
541    /// Convert async_process::ChildStdout into std::process::Stdio.
542    ///
543    /// You can use it to associate to the next process.
544    ///
545    /// # Examples
546    ///
547    /// ```no_run
548    /// # futures_lite::future::block_on(async {
549    /// use async_process::Command;
550    /// use std::process::Stdio;
551    /// use std::io::Read;
552    /// use futures_lite::AsyncReadExt;
553    ///
554    /// let mut ls_child = Command::new("ls").stdout(Stdio::piped()).spawn()?;
555    /// let stdio:Stdio = ls_child.stdout.take().unwrap().into_stdio().await?;
556    ///
557    /// let mut echo_child = Command::new("echo").stdin(stdio).stdout(Stdio::piped()).spawn()?;
558    /// let mut buf = vec![];
559    /// echo_child.stdout.take().unwrap().read(&mut buf).await;
560    /// # std::io::Result::Ok(()) });
561    /// ```
562    pub async fn into_stdio(self) -> io::Result<std::process::Stdio> {
563        cfg_if::cfg_if! {
564            if #[cfg(windows)] {
565                Ok(self.0.into_inner().await.into())
566            } else if #[cfg(unix)] {
567                let child_stdout = self.0.into_inner()?;
568                blocking_fd(rustix::fd::AsFd::as_fd(&child_stdout))?;
569                Ok(child_stdout.into())
570            }
571        }
572    }
573}
574
575impl io::AsyncRead for ChildStdout {
576    fn poll_read(
577        mut self: Pin<&mut Self>,
578        cx: &mut Context<'_>,
579        buf: &mut [u8],
580    ) -> Poll<io::Result<usize>> {
581        Pin::new(&mut self.0).poll_read(cx, buf)
582    }
583}
584
585#[cfg(unix)]
586impl AsRawFd for ChildStdout {
587    fn as_raw_fd(&self) -> RawFd {
588        self.0.as_raw_fd()
589    }
590}
591
592#[cfg(unix)]
593impl AsFd for ChildStdout {
594    fn as_fd(&self) -> BorrowedFd<'_> {
595        self.0.as_fd()
596    }
597}
598
599#[cfg(unix)]
600impl TryFrom<ChildStdout> for OwnedFd {
601    type Error = io::Error;
602
603    fn try_from(value: ChildStdout) -> Result<Self, Self::Error> {
604        value.0.try_into()
605    }
606}
607
608/// A handle to a child process's standard error (stderr).
609///
610/// When a [`ChildStderr`] is dropped, the underlying handle gets closed.
611#[derive(Debug)]
612pub struct ChildStderr(
613    #[cfg(windows)] Unblock<std::process::ChildStderr>,
614    #[cfg(unix)] Async<std::process::ChildStderr>,
615);
616
617impl ChildStderr {
618    /// Convert async_process::ChildStderr into std::process::Stdio.
619    ///
620    /// You can use it to associate to the next process.
621    ///
622    /// # Examples
623    ///
624    /// ```no_run
625    /// # futures_lite::future::block_on(async {
626    /// use async_process::Command;
627    /// use std::process::Stdio;
628    ///
629    /// let mut ls_child = Command::new("ls").arg("x").stderr(Stdio::piped()).spawn()?;
630    /// let stdio:Stdio = ls_child.stderr.take().unwrap().into_stdio().await?;
631    ///
632    /// let mut echo_child = Command::new("echo").stdin(stdio).spawn()?;
633    /// # std::io::Result::Ok(()) });
634    /// ```
635    pub async fn into_stdio(self) -> io::Result<std::process::Stdio> {
636        cfg_if::cfg_if! {
637            if #[cfg(windows)] {
638                Ok(self.0.into_inner().await.into())
639            } else if #[cfg(unix)] {
640                let child_stderr = self.0.into_inner()?;
641                blocking_fd(rustix::fd::AsFd::as_fd(&child_stderr))?;
642                Ok(child_stderr.into())
643            }
644        }
645    }
646}
647
648impl io::AsyncRead for ChildStderr {
649    fn poll_read(
650        mut self: Pin<&mut Self>,
651        cx: &mut Context<'_>,
652        buf: &mut [u8],
653    ) -> Poll<io::Result<usize>> {
654        Pin::new(&mut self.0).poll_read(cx, buf)
655    }
656}
657
658#[cfg(unix)]
659impl AsRawFd for ChildStderr {
660    fn as_raw_fd(&self) -> RawFd {
661        self.0.as_raw_fd()
662    }
663}
664
665#[cfg(unix)]
666impl AsFd for ChildStderr {
667    fn as_fd(&self) -> BorrowedFd<'_> {
668        self.0.as_fd()
669    }
670}
671
672#[cfg(unix)]
673impl TryFrom<ChildStderr> for OwnedFd {
674    type Error = io::Error;
675
676    fn try_from(value: ChildStderr) -> Result<Self, Self::Error> {
677        value.0.try_into()
678    }
679}
680
681/// Runs the driver for the asynchronous processes.
682///
683/// This future takes control of global structures related to driving [`Child`]ren and reaping
684/// zombie processes. These responsibilities include listening for the `SIGCHLD` signal and
685/// making sure zombie processes are successfully waited on.
686///
687/// If multiple tasks run `driver()` at once, only one will actually drive the reaper; the other
688/// ones will just sleep. If a task that is driving the reaper is dropped, a previously sleeping
689/// task will take over. If all tasks driving the reaper are dropped, the "async-process" thread
690/// will be spawned. The "async-process" thread just blocks on this future and will automatically
691/// be spawned if no tasks are driving the reaper once a [`Child`] is created.
692///
693/// This future will never complete. It is intended to be ran on a background task in your
694/// executor of choice.
695///
696/// # Examples
697///
698/// ```no_run
699/// use async_executor::Executor;
700/// use async_process::{driver, Command};
701///
702/// # futures_lite::future::block_on(async {
703/// // Create an executor and run on it.
704/// let ex = Executor::new();
705/// ex.run(async {
706///     // Run the driver future in the background.
707///     ex.spawn(driver()).detach();
708///
709///     // Run a command.
710///     Command::new("ls").output().await.ok();
711/// }).await;
712/// # });
713/// ```
714#[allow(clippy::manual_async_fn)]
715#[inline]
716pub fn driver() -> impl Future<Output = Infallible> + Send + 'static {
717    async {
718        // Get the reaper.
719        let reaper = Reaper::get();
720
721        // Make sure the reaper knows we're driving it.
722        reaper.drivers.fetch_add(1, Ordering::SeqCst);
723
724        // Decrement the driver count when this future is dropped.
725        let _guard = CallOnDrop(|| {
726            let prev_count = reaper.drivers.fetch_sub(1, Ordering::SeqCst);
727
728            // If this was the last driver, and there are still resources actively using the
729            // reaper, make sure that there is a thread driving the reaper.
730            if prev_count == 1
731                && (reaper.child_count.load(Ordering::SeqCst) > 0 || reaper.sys.has_zombies())
732            {
733                reaper.ensure_driven();
734            }
735        });
736
737        // Acquire the reaper lock and start polling the SIGCHLD event.
738        let guard = reaper.sys.lock().await;
739        reaper.sys.reap(guard).await
740    }
741}
742
743/// A builder for spawning processes.
744///
745/// # Examples
746///
747/// ```no_run
748/// # futures_lite::future::block_on(async {
749/// use async_process::Command;
750///
751/// let output = if cfg!(target_os = "windows") {
752///     Command::new("cmd").args(&["/C", "echo hello"]).output().await?
753/// } else {
754///     Command::new("sh").arg("-c").arg("echo hello").output().await?
755/// };
756/// # std::io::Result::Ok(()) });
757/// ```
758pub struct Command {
759    inner: std::process::Command,
760    stdin: bool,
761    stdout: bool,
762    stderr: bool,
763    reap_on_drop: bool,
764    kill_on_drop: bool,
765}
766
767impl Command {
768    /// Constructs a new [`Command`] for launching `program`.
769    ///
770    /// The initial configuration (the working directory and environment variables) is inherited
771    /// from the current process.
772    ///
773    /// # Examples
774    ///
775    /// ```
776    /// use async_process::Command;
777    ///
778    /// let mut cmd = Command::new("ls");
779    /// ```
780    pub fn new<S: AsRef<OsStr>>(program: S) -> Command {
781        Self::from(std::process::Command::new(program))
782    }
783
784    /// Adds a single argument to pass to the program.
785    ///
786    /// # Examples
787    ///
788    /// ```
789    /// use async_process::Command;
790    ///
791    /// let mut cmd = Command::new("echo");
792    /// cmd.arg("hello");
793    /// cmd.arg("world");
794    /// ```
795    pub fn arg<S: AsRef<OsStr>>(&mut self, arg: S) -> &mut Command {
796        self.inner.arg(arg);
797        self
798    }
799
800    /// Adds multiple arguments to pass to the program.
801    ///
802    /// # Examples
803    ///
804    /// ```
805    /// use async_process::Command;
806    ///
807    /// let mut cmd = Command::new("echo");
808    /// cmd.args(&["hello", "world"]);
809    /// ```
810    pub fn args<I, S>(&mut self, args: I) -> &mut Command
811    where
812        I: IntoIterator<Item = S>,
813        S: AsRef<OsStr>,
814    {
815        self.inner.args(args);
816        self
817    }
818
819    /// Configures an environment variable for the new process.
820    ///
821    /// Note that environment variable names are case-insensitive (but case-preserving) on Windows,
822    /// and case-sensitive on all other platforms.
823    ///
824    /// # Examples
825    ///
826    /// ```
827    /// use async_process::Command;
828    ///
829    /// let mut cmd = Command::new("ls");
830    /// cmd.env("PATH", "/bin");
831    /// ```
832    pub fn env<K, V>(&mut self, key: K, val: V) -> &mut Command
833    where
834        K: AsRef<OsStr>,
835        V: AsRef<OsStr>,
836    {
837        self.inner.env(key, val);
838        self
839    }
840
841    /// Configures multiple environment variables for the new process.
842    ///
843    /// Note that environment variable names are case-insensitive (but case-preserving) on Windows,
844    /// and case-sensitive on all other platforms.
845    ///
846    /// # Examples
847    ///
848    /// ```
849    /// use async_process::Command;
850    ///
851    /// let mut cmd = Command::new("ls");
852    /// cmd.envs(vec![("PATH", "/bin"), ("TERM", "xterm-256color")]);
853    /// ```
854    pub fn envs<I, K, V>(&mut self, vars: I) -> &mut Command
855    where
856        I: IntoIterator<Item = (K, V)>,
857        K: AsRef<OsStr>,
858        V: AsRef<OsStr>,
859    {
860        self.inner.envs(vars);
861        self
862    }
863
864    /// Gets an iterator of the arguments that will be passed to the program.
865    ///
866    /// # Examples
867    ///
868    /// ```
869    /// use std::ffi::OsStr;
870    /// use async_process::Command;
871    ///
872    /// let mut cmd = Command::new("echo");
873    /// cmd.arg("first").arg("second");
874    /// let args: Vec<&OsStr> = cmd.get_args().collect();
875    /// assert_eq!(args, &["first", "second"]);
876    /// ```
877    pub fn get_args(&self) -> std::process::CommandArgs<'_> {
878        self.inner.get_args()
879    }
880
881    /// Gets an iterator of the environment variables explicitly set for the child process.
882    ///
883    /// # Examples
884    ///
885    /// ```
886    /// use std::ffi::OsStr;
887    /// use async_process::Command;
888    ///
889    /// let mut cmd = Command::new("ls");
890    /// cmd.env("TERM", "dumb").env_remove("TZ");
891    /// let envs: Vec<(&OsStr, Option<&OsStr>)> = cmd.get_envs().collect();
892    /// assert_eq!(envs, &[
893    ///     (OsStr::new("TERM"), Some(OsStr::new("dumb"))),
894    ///     (OsStr::new("TZ"), None)
895    /// ]);
896    /// ```
897    pub fn get_envs(&self) -> std::process::CommandEnvs<'_> {
898        self.inner.get_envs()
899    }
900
901    /// Gets the working directory for the child process.
902    ///
903    /// This returns [`None`] if the working directory will not be changed.
904    ///
905    /// # Examples
906    ///
907    /// ```
908    /// use std::path::Path;
909    /// use async_process::Command;
910    ///
911    /// let mut cmd = Command::new("ls");
912    /// assert_eq!(cmd.get_current_dir(), None);
913    /// cmd.current_dir("/bin");
914    /// assert_eq!(cmd.get_current_dir(), Some(Path::new("/bin")));
915    /// ```
916    pub fn get_current_dir(&self) -> Option<&Path> {
917        self.inner.get_current_dir()
918    }
919
920    /// Gets the path to the program that was given to [`Command::new`].
921    ///
922    /// # Examples
923    ///
924    /// ```
925    /// use async_process::Command;
926    ///
927    /// let cmd = Command::new("echo");
928    /// assert_eq!(cmd.get_program(), "echo");
929    /// ```
930    pub fn get_program(&self) -> &OsStr {
931        self.inner.get_program()
932    }
933
934    /// Removes an environment variable mapping.
935    ///
936    /// # Examples
937    ///
938    /// ```
939    /// use async_process::Command;
940    ///
941    /// let mut cmd = Command::new("ls");
942    /// cmd.env_remove("PATH");
943    /// ```
944    pub fn env_remove<K: AsRef<OsStr>>(&mut self, key: K) -> &mut Command {
945        self.inner.env_remove(key);
946        self
947    }
948
949    /// Removes all environment variable mappings.
950    ///
951    /// # Examples
952    ///
953    /// ```
954    /// use async_process::Command;
955    ///
956    /// let mut cmd = Command::new("ls");
957    /// cmd.env_clear();
958    /// ```
959    pub fn env_clear(&mut self) -> &mut Command {
960        self.inner.env_clear();
961        self
962    }
963
964    /// Configures the working directory for the new process.
965    ///
966    /// # Examples
967    ///
968    /// ```
969    /// use async_process::Command;
970    ///
971    /// let mut cmd = Command::new("ls");
972    /// cmd.current_dir("/");
973    /// ```
974    pub fn current_dir<P: AsRef<Path>>(&mut self, dir: P) -> &mut Command {
975        self.inner.current_dir(dir);
976        self
977    }
978
979    /// Configures the standard input (stdin) for the new process.
980    ///
981    /// # Examples
982    ///
983    /// ```
984    /// use async_process::{Command, Stdio};
985    ///
986    /// let mut cmd = Command::new("cat");
987    /// cmd.stdin(Stdio::null());
988    /// ```
989    pub fn stdin<T: Into<Stdio>>(&mut self, cfg: T) -> &mut Command {
990        self.stdin = true;
991        self.inner.stdin(cfg);
992        self
993    }
994
995    /// Configures the standard output (stdout) for the new process.
996    ///
997    /// # Examples
998    ///
999    /// ```
1000    /// use async_process::{Command, Stdio};
1001    ///
1002    /// let mut cmd = Command::new("ls");
1003    /// cmd.stdout(Stdio::piped());
1004    /// ```
1005    pub fn stdout<T: Into<Stdio>>(&mut self, cfg: T) -> &mut Command {
1006        self.stdout = true;
1007        self.inner.stdout(cfg);
1008        self
1009    }
1010
1011    /// Configures the standard error (stderr) for the new process.
1012    ///
1013    /// # Examples
1014    ///
1015    /// ```
1016    /// use async_process::{Command, Stdio};
1017    ///
1018    /// let mut cmd = Command::new("ls");
1019    /// cmd.stderr(Stdio::piped());
1020    /// ```
1021    pub fn stderr<T: Into<Stdio>>(&mut self, cfg: T) -> &mut Command {
1022        self.stderr = true;
1023        self.inner.stderr(cfg);
1024        self
1025    }
1026
1027    /// Configures whether to reap the zombie process when [`Child`] is dropped.
1028    ///
1029    /// When the process finishes, it becomes a "zombie" and some resources associated with it
1030    /// remain until [`Child::try_status()`], [`Child::status()`], or [`Child::output()`] collects
1031    /// its exit code.
1032    ///
1033    /// If its exit code is never collected, the resources may leak forever. This crate has a
1034    /// background thread named "async-process" that collects such "zombie" processes and then
1035    /// "reaps" them, thus preventing the resource leaks.
1036    ///
1037    /// The default value of this option is `true`.
1038    ///
1039    /// # Examples
1040    ///
1041    /// ```
1042    /// use async_process::{Command, Stdio};
1043    ///
1044    /// let mut cmd = Command::new("cat");
1045    /// cmd.reap_on_drop(false);
1046    /// ```
1047    pub fn reap_on_drop(&mut self, reap_on_drop: bool) -> &mut Command {
1048        self.reap_on_drop = reap_on_drop;
1049        self
1050    }
1051
1052    /// Configures whether to kill the process when [`Child`] is dropped.
1053    ///
1054    /// The default value of this option is `false`.
1055    ///
1056    /// # Examples
1057    ///
1058    /// ```
1059    /// use async_process::{Command, Stdio};
1060    ///
1061    /// let mut cmd = Command::new("cat");
1062    /// cmd.kill_on_drop(true);
1063    /// ```
1064    pub fn kill_on_drop(&mut self, kill_on_drop: bool) -> &mut Command {
1065        self.kill_on_drop = kill_on_drop;
1066        self
1067    }
1068
1069    /// Executes the command and returns the [`Child`] handle to it.
1070    ///
1071    /// If not configured, stdin, stdout and stderr will be set to [`Stdio::inherit()`].
1072    ///
1073    /// # Examples
1074    ///
1075    /// ```no_run
1076    /// # futures_lite::future::block_on(async {
1077    /// use async_process::Command;
1078    ///
1079    /// let child = Command::new("ls").spawn()?;
1080    /// # std::io::Result::Ok(()) });
1081    /// ```
1082    pub fn spawn(&mut self) -> io::Result<Child> {
1083        if !self.stdin {
1084            self.inner.stdin(Stdio::inherit());
1085        }
1086        if !self.stdout {
1087            self.inner.stdout(Stdio::inherit());
1088        }
1089        if !self.stderr {
1090            self.inner.stderr(Stdio::inherit());
1091        }
1092
1093        Child::new(self)
1094    }
1095
1096    /// Executes the command, waits for it to exit, and returns the exit status.
1097    ///
1098    /// If not configured, stdin, stdout and stderr will be set to [`Stdio::inherit()`].
1099    ///
1100    /// # Examples
1101    ///
1102    /// ```no_run
1103    /// # futures_lite::future::block_on(async {
1104    /// use async_process::Command;
1105    ///
1106    /// let status = Command::new("cp")
1107    ///     .arg("a.txt")
1108    ///     .arg("b.txt")
1109    ///     .status()
1110    ///     .await?;
1111    /// # std::io::Result::Ok(()) });
1112    /// ```
1113    pub fn status(&mut self) -> impl Future<Output = io::Result<ExitStatus>> {
1114        let child = self.spawn();
1115        async { child?.status().await }
1116    }
1117
1118    /// Executes the command and collects its output.
1119    ///
1120    /// If not configured, stdin will be set to [`Stdio::null()`], and stdout and stderr will be
1121    /// set to [`Stdio::piped()`].
1122    ///
1123    /// # Examples
1124    ///
1125    /// ```no_run
1126    /// # futures_lite::future::block_on(async {
1127    /// use async_process::Command;
1128    ///
1129    /// let output = Command::new("cat")
1130    ///     .arg("a.txt")
1131    ///     .output()
1132    ///     .await?;
1133    /// # std::io::Result::Ok(()) });
1134    /// ```
1135    pub fn output(&mut self) -> impl Future<Output = io::Result<Output>> {
1136        if !self.stdin {
1137            self.inner.stdin(Stdio::null());
1138        }
1139        if !self.stdout {
1140            self.inner.stdout(Stdio::piped());
1141        }
1142        if !self.stderr {
1143            self.inner.stderr(Stdio::piped());
1144        }
1145
1146        let child = Child::new(self);
1147        async { child?.output().await }
1148    }
1149}
1150
1151impl From<std::process::Command> for Command {
1152    fn from(inner: std::process::Command) -> Self {
1153        Self {
1154            inner,
1155            stdin: false,
1156            stdout: false,
1157            stderr: false,
1158            reap_on_drop: true,
1159            kill_on_drop: false,
1160        }
1161    }
1162}
1163
1164impl fmt::Debug for Command {
1165    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1166        if f.alternate() {
1167            f.debug_struct("Command")
1168                .field("inner", &self.inner)
1169                .field("stdin", &self.stdin)
1170                .field("stdout", &self.stdout)
1171                .field("stderr", &self.stderr)
1172                .field("reap_on_drop", &self.reap_on_drop)
1173                .field("kill_on_drop", &self.kill_on_drop)
1174                .finish()
1175        } else {
1176            // Stdlib outputs command-line in Debug for Command. This does the
1177            // same, if not in "alternate" (long pretty-printed) mode.
1178            // This is useful for logs, for example.
1179            fmt::Debug::fmt(&self.inner, f)
1180        }
1181    }
1182}
1183
1184/// Moves `Fd` out of non-blocking mode.
1185#[cfg(unix)]
1186fn blocking_fd(fd: rustix::fd::BorrowedFd<'_>) -> io::Result<()> {
1187    cfg_if::cfg_if! {
1188        // ioctl(FIONBIO) sets the flag atomically, but we use this only on Linux
1189        // for now, as with the standard library, because it seems to behave
1190        // differently depending on the platform.
1191        // https://github.com/rust-lang/rust/commit/efeb42be2837842d1beb47b51bb693c7474aba3d
1192        // https://github.com/libuv/libuv/blob/e9d91fccfc3e5ff772d5da90e1c4a24061198ca0/src/unix/poll.c#L78-L80
1193        // https://github.com/tokio-rs/mio/commit/0db49f6d5caf54b12176821363d154384357e70a
1194        if #[cfg(target_os = "linux")] {
1195            rustix::io::ioctl_fionbio(fd, false)?;
1196        } else {
1197            let previous = rustix::fs::fcntl_getfl(fd)?;
1198            let new = previous & !rustix::fs::OFlags::NONBLOCK;
1199            if new != previous {
1200                rustix::fs::fcntl_setfl(fd, new)?;
1201            }
1202        }
1203    }
1204    Ok(())
1205}
1206
1207struct CallOnDrop<F: FnMut()>(F);
1208
1209impl<F: FnMut()> Drop for CallOnDrop<F> {
1210    fn drop(&mut self) {
1211        (self.0)();
1212    }
1213}
1214
1215#[cfg(test)]
1216mod test {
1217    #[test]
1218    fn polled_driver() {
1219        use super::{driver, Command};
1220        use futures_lite::future;
1221        use futures_lite::prelude::*;
1222
1223        let is_thread_spawned =
1224            || super::DRIVER_THREAD_SPAWNED.load(std::sync::atomic::Ordering::SeqCst);
1225
1226        #[cfg(unix)]
1227        fn command() -> Command {
1228            let mut cmd = Command::new("sh");
1229            cmd.arg("-c").arg("echo hello");
1230            cmd
1231        }
1232
1233        #[cfg(windows)]
1234        fn command() -> Command {
1235            let mut cmd = Command::new("cmd");
1236            cmd.arg("/C").arg("echo hello");
1237            cmd
1238        }
1239
1240        #[cfg(unix)]
1241        const OUTPUT: &[u8] = b"hello\n";
1242        #[cfg(windows)]
1243        const OUTPUT: &[u8] = b"hello\r\n";
1244
1245        future::block_on(async {
1246            // Thread should not be spawned off the bat.
1247            assert!(!is_thread_spawned());
1248
1249            // Spawn a driver.
1250            let mut driver1 = Box::pin(driver());
1251            future::poll_once(&mut driver1).await;
1252            assert!(!is_thread_spawned());
1253
1254            // We should be able to run the driver in parallel with a process future.
1255            async {
1256                (&mut driver1).await;
1257            }
1258            .or(async {
1259                let output = command().output().await.unwrap();
1260                assert_eq!(output.stdout, OUTPUT);
1261            })
1262            .await;
1263            assert!(!is_thread_spawned());
1264
1265            // Spawn a second driver.
1266            let mut driver2 = Box::pin(driver());
1267            future::poll_once(&mut driver2).await;
1268            assert!(!is_thread_spawned());
1269
1270            // Poll both drivers in parallel.
1271            async {
1272                (&mut driver1).await;
1273            }
1274            .or(async {
1275                (&mut driver2).await;
1276            })
1277            .or(async {
1278                let output = command().output().await.unwrap();
1279                assert_eq!(output.stdout, OUTPUT);
1280            })
1281            .await;
1282            assert!(!is_thread_spawned());
1283
1284            // Once one is dropped, the other should take over.
1285            drop(driver1);
1286            assert!(!is_thread_spawned());
1287
1288            // Poll driver2 in parallel with a process future.
1289            async {
1290                (&mut driver2).await;
1291            }
1292            .or(async {
1293                let output = command().output().await.unwrap();
1294                assert_eq!(output.stdout, OUTPUT);
1295            })
1296            .await;
1297            assert!(!is_thread_spawned());
1298
1299            // Once driver2 is dropped, the thread should not be spawned, as there are no active
1300            // child processes..
1301            drop(driver2);
1302            assert!(!is_thread_spawned());
1303
1304            // We should now be able to poll the process future independently, it will spawn the
1305            // thread.
1306            let output = command().output().await.unwrap();
1307            assert_eq!(output.stdout, OUTPUT);
1308            assert!(is_thread_spawned());
1309        });
1310    }
1311}