Skip to main content

tokio_process_tools/process_handle/
mod.rs

1//! Spawned-process handle, stdin lifecycle, and process state queries.
2
3mod drop_guard;
4mod group;
5pub(crate) mod output_collection;
6mod replay;
7#[cfg(any(unix, windows))]
8mod signal;
9mod spawn;
10pub(crate) mod termination;
11mod wait;
12pub(crate) mod wait_builder;
13
14use crate::output_stream::OutputStream;
15use crate::process_handle::drop_guard::DropMode;
16use crate::process_handle::group::ProcessGroup;
17use std::borrow::Cow;
18use std::io;
19use std::mem::ManuallyDrop;
20use std::process::ExitStatus;
21use tokio::process::Child;
22use tokio::process::ChildStdin;
23
24/// Represents the `stdin` stream of a child process.
25///
26/// stdin is always configured as piped, so it starts as `Open` with a [`ChildStdin`] handle
27/// that can be used to write data to the process. It can be explicitly closed by calling
28/// [`Stdin::close`], or implicitly closed by the staged
29/// [`ProcessHandle::wait_for_completion`] builder and [`ProcessHandle::kill`], after which it
30/// transitions to the `Closed` state. Note that some operations (kill) close `stdin` early in
31/// order to match the behavior of tokio.
32#[derive(Debug)]
33pub enum Stdin {
34    /// `stdin` is open and available for writing.
35    Open(ChildStdin),
36
37    /// `stdin` has been closed.
38    Closed,
39}
40
41impl Stdin {
42    /// Returns `true` if stdin is open and available for writing.
43    #[must_use]
44    pub fn is_open(&self) -> bool {
45        matches!(self, Stdin::Open(_))
46    }
47
48    /// Returns a mutable reference to the underlying [`ChildStdin`] if open, or `None` if closed.
49    pub fn as_mut(&mut self) -> Option<&mut ChildStdin> {
50        match self {
51            Stdin::Open(stdin) => Some(stdin),
52            Stdin::Closed => None,
53        }
54    }
55
56    /// Closes stdin by dropping the underlying [`ChildStdin`] handle.
57    ///
58    /// This sends `EOF` to the child process. After calling this method, this stdin
59    /// will be in the `Closed` state and no further writes will be possible. The staged
60    /// [`ProcessHandle::wait_for_completion`] builder and [`ProcessHandle::kill`] also close
61    /// any still-open stdin automatically; call this method when the child must observe EOF
62    /// earlier.
63    pub fn close(&mut self) {
64        *self = Stdin::Closed;
65    }
66}
67
68/// Represents the running state of a process.
69#[derive(Debug)]
70#[must_use = "Discarding the state defeats the purpose of probing; match on the variants \
71              (or call `is_definitely_running`) to distinguish Running, Terminated, and \
72              Uncertain."]
73pub enum RunningState {
74    /// The process is still running.
75    Running,
76
77    /// The process has terminated with the given exit status.
78    Terminated(ExitStatus),
79
80    /// Reaping the child failed, so the actual state could not be observed.
81    ///
82    /// The conservative reading is "still running": until termination has been confirmed, the
83    /// drop-cleanup and panic guards on the handle are still required, and the process may yet
84    /// produce output. The variant carries the underlying [`io::Error`] so callers can log or
85    /// retry at their discretion; treating it as "not running" risks dropping a still-live child.
86    /// [`RunningState::is_definitely_running`] returns `false` here on purpose so callers cannot
87    /// mistake "we don't know" for "still running" via a single boolean check, but matching on
88    /// the enum is the only way to distinguish [`RunningState::Terminated`] from this case.
89    Uncertain(io::Error),
90}
91
92impl RunningState {
93    /// Returns `true` only when the state is [`RunningState::Running`].
94    ///
95    /// Both [`RunningState::Terminated`] and [`RunningState::Uncertain`] return `false`. The
96    /// asymmetry is intentional: callers who need to distinguish "not running" from "we don't
97    /// know" should match the enum directly. The error carried by
98    /// [`RunningState::Uncertain`] is real and worth surfacing, not silently collapsing.
99    ///
100    /// Even when this returns `false` for an [`RunningState::Uncertain`] state, the safe
101    /// interpretation is still "the process may be running"; do not use this predicate to decide
102    /// whether it is safe to drop the handle. See [`RunningState::Uncertain`].
103    #[must_use]
104    pub fn is_definitely_running(&self) -> bool {
105        matches!(self, RunningState::Running)
106    }
107}
108
109/// A handle to a spawned process with captured stdout/stderr streams.
110///
111/// This type provides methods for waiting on process completion, terminating the process,
112/// and accessing its output streams. By default, processes must be explicitly waited on
113/// or terminated before being dropped (see [`ProcessHandle::must_be_terminated`]).
114///
115/// If applicable, a process handle can be wrapped in a [`crate::TerminateOnDrop`] to be terminated
116/// automatically upon being dropped. Note that this requires a multithreaded runtime!
117#[derive(Debug)]
118pub struct ProcessHandle<Stdout, Stderr = Stdout>
119where
120    Stdout: OutputStream,
121    Stderr: OutputStream,
122{
123    pub(crate) name: Cow<'static, str>,
124
125    /// Owns the spawned child as the leader of a process group. On Windows this also owns the
126    /// Job Object the child has been assigned to, so the forceful-kill path can call
127    /// `TerminateJobObject` and reach every descendant the OS has associated with the job
128    /// instead of orphaning the tree the way `Child::start_kill` would.
129    pub(super) group: ProcessGroup,
130
131    std_in: Stdin,
132    std_out_stream: Stdout,
133    std_err_stream: Stderr,
134
135    pub(super) drop_mode: DropMode,
136}
137
138impl<Stdout, Stderr> ProcessHandle<Stdout, Stderr>
139where
140    Stdout: OutputStream,
141    Stderr: OutputStream,
142{
143    /// Returns the process name configured at spawn time.
144    ///
145    /// The name is set by the [`Process`](crate::Process) builder via either an explicit string
146    /// or an [`AutoName`](crate::AutoName) / [`AutoNameSettings`](crate::AutoNameSettings)
147    /// derivation, and is used in diagnostics and error messages to identify the child.
148    pub fn name(&self) -> &str {
149        &self.name
150    }
151
152    /// Returns a mutable reference to the (potentially already closed) stdin stream.
153    ///
154    /// Use this to write data to the child process's stdin. The stdin stream implements
155    /// [`tokio::io::AsyncWrite`], allowing you to use methods like `write_all()` and `flush()`.
156    ///
157    /// # Example
158    ///
159    /// ```no_run
160    /// # use tokio::process::Command;
161    /// # use tokio_process_tools::{
162    ///     AutoName, DEFAULT_MAX_BUFFERED_CHUNKS, DEFAULT_READ_CHUNK_SIZE, Process,
163    /// };
164    /// # use tokio::io::AsyncWriteExt;
165    /// # tokio_test::block_on(async {
166    /// // The stream backend does not make a difference here.
167    /// let mut process = Process::new(Command::new("cat"))
168    ///     .name(AutoName::program_only())
169    ///     .stdout_and_stderr(|stream| {
170    ///         stream
171    ///             .broadcast()
172    ///             .lossy_without_backpressure()
173    ///             .no_replay()
174    ///             .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
175    ///             .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
176    ///     })
177    ///     .spawn()
178    ///     .unwrap();
179    ///
180    /// // Write to stdin.
181    /// if let Some(stdin) = process.stdin().as_mut() {
182    ///     stdin.write_all(b"Hello, process!\n").await.unwrap();
183    ///     stdin.flush().await.unwrap();
184    /// }
185    ///
186    /// // Close stdin to signal EOF.
187    /// process.stdin().close();
188    /// # });
189    /// ```
190    pub fn stdin(&mut self) -> &mut Stdin {
191        &mut self.std_in
192    }
193
194    /// Returns a reference to the stdout stream.
195    ///
196    /// For [`BroadcastOutputStream`](crate::BroadcastOutputStream), this allows creating multiple
197    /// concurrent consumers. For
198    /// [`SingleSubscriberOutputStream`](crate::SingleSubscriberOutputStream), only one active
199    /// consumer can exist at a time; concurrent attempts return
200    /// [`StreamConsumerError::ActiveConsumer`](crate::StreamConsumerError::ActiveConsumer) from
201    /// `consume(...)` / `consume_async(...)` / `wait_for_line(...)` rather than panicking.
202    pub fn stdout(&self) -> &Stdout {
203        &self.std_out_stream
204    }
205
206    /// Returns a reference to the stderr stream.
207    ///
208    /// For [`BroadcastOutputStream`](crate::BroadcastOutputStream), this allows creating multiple
209    /// concurrent consumers. For
210    /// [`SingleSubscriberOutputStream`](crate::SingleSubscriberOutputStream), only one active
211    /// consumer can exist at a time; concurrent attempts return
212    /// [`StreamConsumerError::ActiveConsumer`](crate::StreamConsumerError::ActiveConsumer) from
213    /// `consume(...)` / `consume_async(...)` / `wait_for_line(...)` rather than panicking.
214    pub fn stderr(&self) -> &Stderr {
215        &self.std_err_stream
216    }
217}
218
219impl<Stdout, Stderr> ProcessHandle<Stdout, Stderr>
220where
221    Stdout: OutputStream,
222    Stderr: OutputStream,
223{
224    /// Forwards to [`ProcessGroup::send_kill`].
225    ///
226    /// Kept as a single named entry point so its callers in [`drop_guard`](super::drop_guard)
227    /// and [`termination`](super::termination) read consistently as "kill via the handle"
228    /// rather than reaching into the wrapped [`ProcessGroup`] directly.
229    pub(super) fn send_kill_signal(&mut self) -> io::Result<()> {
230        self.group.send_kill()
231    }
232
233    /// Returns the OS process ID if the process hasn't exited yet.
234    ///
235    /// Once this process has been polled to completion this will return None.
236    pub fn id(&self) -> Option<u32> {
237        self.group.id()
238    }
239
240    /// Reaps the child's exit status without affecting the drop-cleanup or panic guards.
241    ///
242    /// Callers that observe an exit status are responsible for calling
243    /// [`Self::must_not_be_terminated`] when they take ownership of the lifecycle: the wait,
244    /// terminate, and kill paths do this explicitly when they succeed. `is_running` deliberately
245    /// does not, so a status check never silently disarms the safeguards.
246    ///
247    /// Exposed publicly so callers can wire this as the preflight reaper to
248    /// [`Self::terminate_with_hooks`].
249    #[doc(hidden)]
250    pub fn try_reap_exit_status(&mut self) -> Result<Option<ExitStatus>, io::Error> {
251        self.group.try_wait()
252    }
253
254    /// Checks if the process is currently running.
255    ///
256    /// Returns [`RunningState::Running`] if the process is still running,
257    /// [`RunningState::Terminated`] if it has exited, or [`RunningState::Uncertain`]
258    /// if the state could not be determined. Each call re-runs the underlying
259    /// `try_wait`, so a transient probing failure observed once does not become permanent.
260    ///
261    /// This is a pure status query: it does not disarm the drop-cleanup or panic guards, even
262    /// when it observes that the process has exited. A handle whose status reads
263    /// [`RunningState::Terminated`] still panics on drop until one of the lifecycle methods has
264    /// closed it. Use [`Self::wait_for_completion`] (the staged builder), [`Self::terminate`],
265    /// or [`Self::kill`] to close the lifecycle through a successful terminal call, or call
266    /// [`Self::must_not_be_terminated`] explicitly to detach the handle without termination.
267    //noinspection RsSelfConvention
268    pub fn is_running(&mut self) -> RunningState {
269        match self.try_reap_exit_status() {
270            Ok(None) => RunningState::Running,
271            Ok(Some(exit_status)) => RunningState::Terminated(exit_status),
272            Err(err) => RunningState::Uncertain(err),
273        }
274    }
275}
276
277impl<Stdout, Stderr> ProcessHandle<Stdout, Stderr>
278where
279    Stdout: OutputStream,
280    Stderr: OutputStream,
281{
282    /// Consumes this handle to provide the wrapped `tokio::process::Child` instance, stdin handle,
283    /// and stdout/stderr output streams.
284    ///
285    /// The returned [`Child`] no longer owns its `stdin` field because this crate separates piped
286    /// stdin into [`Stdin`]. Keep the returned [`Stdin`] alive to keep the child's stdin pipe open.
287    /// Dropping [`Stdin::Open`] closes the pipe, so the child may observe EOF and exit or otherwise
288    /// change behavior.
289    pub fn into_inner(mut self) -> (Child, Stdin, Stdout, Stderr) {
290        self.must_not_be_terminated();
291        let mut this = ManuallyDrop::new(self);
292
293        // SAFETY: `this` is wrapped in `ManuallyDrop`, so moving out fields with `ptr::read` will
294        // not cause the original `ProcessHandle` destructor to run. We explicitly drop the fields
295        // not returned from this function exactly once before returning the owned parts. The
296        // `ProcessGroup` is consumed by `into_leader()` to produce the bare Tokio `Child`; on
297        // Windows this drops the Job Object handle, which only closes the kernel handle (no
298        // `JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE` is set) and does not kill the child.
299        unsafe {
300            let group = std::ptr::read(&raw const this.group);
301            let stdin = std::ptr::read(&raw const this.std_in);
302            let stdout = std::ptr::read(&raw const this.std_out_stream);
303            let stderr = std::ptr::read(&raw const this.std_err_stream);
304
305            std::ptr::drop_in_place(&raw mut this.name);
306            std::ptr::drop_in_place(&raw mut this.drop_mode);
307
308            (group.into_leader(), stdin, stdout, stderr)
309        }
310    }
311}
312
313#[cfg(test)]
314mod tests {
315    use super::*;
316    use assertr::prelude::*;
317
318    #[test]
319    fn closed_stdin_reports_closed() {
320        let mut stdin = Stdin::Closed;
321
322        assert_that!(stdin.is_open()).is_false();
323        assert_that!(stdin.as_mut()).is_none();
324
325        stdin.close();
326
327        assert_that!(stdin.is_open()).is_false();
328        assert_that!(stdin.as_mut()).is_none();
329    }
330}