tokio_process_tools/process_handle/mod.rs
1//! Spawned-process handle, stdin lifecycle, and process state queries.
2
3mod drop_guard;
4mod group;
5pub(crate) mod output_collection;
6mod replay;
7#[cfg(any(unix, windows))]
8mod signal;
9mod spawn;
10pub(crate) mod termination;
11mod wait;
12pub(crate) mod wait_builder;
13
14use crate::output_stream::OutputStream;
15use crate::process_handle::drop_guard::DropMode;
16use crate::process_handle::group::ProcessGroup;
17use std::borrow::Cow;
18use std::io;
19use std::mem::ManuallyDrop;
20use std::process::ExitStatus;
21use tokio::process::Child;
22use tokio::process::ChildStdin;
23
24/// Represents the `stdin` stream of a child process.
25///
26/// stdin is always configured as piped, so it starts as `Open` with a [`ChildStdin`] handle
27/// that can be used to write data to the process. It can be explicitly closed by calling
28/// [`Stdin::close`], or implicitly closed by the staged
29/// [`ProcessHandle::wait_for_completion`] builder and [`ProcessHandle::kill`], after which it
30/// transitions to the `Closed` state. Note that some operations (kill) close `stdin` early in
31/// order to match the behavior of tokio.
32#[derive(Debug)]
33pub enum Stdin {
34 /// `stdin` is open and available for writing.
35 Open(ChildStdin),
36
37 /// `stdin` has been closed.
38 Closed,
39}
40
41impl Stdin {
42 /// Returns `true` if stdin is open and available for writing.
43 #[must_use]
44 pub fn is_open(&self) -> bool {
45 matches!(self, Stdin::Open(_))
46 }
47
48 /// Returns a mutable reference to the underlying [`ChildStdin`] if open, or `None` if closed.
49 pub fn as_mut(&mut self) -> Option<&mut ChildStdin> {
50 match self {
51 Stdin::Open(stdin) => Some(stdin),
52 Stdin::Closed => None,
53 }
54 }
55
56 /// Closes stdin by dropping the underlying [`ChildStdin`] handle.
57 ///
58 /// This sends `EOF` to the child process. After calling this method, this stdin
59 /// will be in the `Closed` state and no further writes will be possible. The staged
60 /// [`ProcessHandle::wait_for_completion`] builder and [`ProcessHandle::kill`] also close
61 /// any still-open stdin automatically; call this method when the child must observe EOF
62 /// earlier.
63 pub fn close(&mut self) {
64 *self = Stdin::Closed;
65 }
66}
67
68/// Represents the running state of a process.
69#[derive(Debug)]
70#[must_use = "Discarding the state defeats the purpose of probing; match on the variants \
71 (or call `is_definitely_running`) to distinguish Running, Terminated, and \
72 Uncertain."]
73pub enum RunningState {
74 /// The process is still running.
75 Running,
76
77 /// The process has terminated with the given exit status.
78 Terminated(ExitStatus),
79
80 /// Reaping the child failed, so the actual state could not be observed.
81 ///
82 /// The conservative reading is "still running": until termination has been confirmed, the
83 /// drop-cleanup and panic guards on the handle are still required, and the process may yet
84 /// produce output. The variant carries the underlying [`io::Error`] so callers can log or
85 /// retry at their discretion; treating it as "not running" risks dropping a still-live child.
86 /// [`RunningState::is_definitely_running`] returns `false` here on purpose so callers cannot
87 /// mistake "we don't know" for "still running" via a single boolean check, but matching on
88 /// the enum is the only way to distinguish [`RunningState::Terminated`] from this case.
89 Uncertain(io::Error),
90}
91
92impl RunningState {
93 /// Returns `true` only when the state is [`RunningState::Running`].
94 ///
95 /// Both [`RunningState::Terminated`] and [`RunningState::Uncertain`] return `false`. The
96 /// asymmetry is intentional: callers who need to distinguish "not running" from "we don't
97 /// know" should match the enum directly. The error carried by
98 /// [`RunningState::Uncertain`] is real and worth surfacing, not silently collapsing.
99 ///
100 /// Even when this returns `false` for an [`RunningState::Uncertain`] state, the safe
101 /// interpretation is still "the process may be running"; do not use this predicate to decide
102 /// whether it is safe to drop the handle. See [`RunningState::Uncertain`].
103 #[must_use]
104 pub fn is_definitely_running(&self) -> bool {
105 matches!(self, RunningState::Running)
106 }
107}
108
109/// A handle to a spawned process with captured stdout/stderr streams.
110///
111/// This type provides methods for waiting on process completion, terminating the process,
112/// and accessing its output streams. By default, processes must be explicitly waited on
113/// or terminated before being dropped (see [`ProcessHandle::must_be_terminated`]).
114///
115/// If applicable, a process handle can be wrapped in a [`crate::TerminateOnDrop`] to be terminated
116/// automatically upon being dropped. Note that this requires a multithreaded runtime!
117#[derive(Debug)]
118pub struct ProcessHandle<Stdout, Stderr = Stdout>
119where
120 Stdout: OutputStream,
121 Stderr: OutputStream,
122{
123 pub(crate) name: Cow<'static, str>,
124
125 /// Owns the spawned child as the leader of a process group. On Windows this also owns the
126 /// Job Object the child has been assigned to, so the forceful-kill path can call
127 /// `TerminateJobObject` and reach every descendant the OS has associated with the job
128 /// instead of orphaning the tree the way `Child::start_kill` would.
129 pub(super) group: ProcessGroup,
130
131 std_in: Stdin,
132 std_out_stream: Stdout,
133 std_err_stream: Stderr,
134
135 pub(super) drop_mode: DropMode,
136}
137
138impl<Stdout, Stderr> ProcessHandle<Stdout, Stderr>
139where
140 Stdout: OutputStream,
141 Stderr: OutputStream,
142{
143 /// Returns the process name configured at spawn time.
144 ///
145 /// The name is set by the [`Process`](crate::Process) builder via either an explicit string
146 /// or an [`AutoName`](crate::AutoName) / [`AutoNameSettings`](crate::AutoNameSettings)
147 /// derivation, and is used in diagnostics and error messages to identify the child.
148 pub fn name(&self) -> &str {
149 &self.name
150 }
151
152 /// Returns a mutable reference to the (potentially already closed) stdin stream.
153 ///
154 /// Use this to write data to the child process's stdin. The stdin stream implements
155 /// [`tokio::io::AsyncWrite`], allowing you to use methods like `write_all()` and `flush()`.
156 ///
157 /// # Example
158 ///
159 /// ```no_run
160 /// # use tokio::process::Command;
161 /// # use tokio_process_tools::{
162 /// AutoName, DEFAULT_MAX_BUFFERED_CHUNKS, DEFAULT_READ_CHUNK_SIZE, Process,
163 /// };
164 /// # use tokio::io::AsyncWriteExt;
165 /// # tokio_test::block_on(async {
166 /// // The stream backend does not make a difference here.
167 /// let mut process = Process::new(Command::new("cat"))
168 /// .name(AutoName::program_only())
169 /// .stdout_and_stderr(|stream| {
170 /// stream
171 /// .broadcast()
172 /// .lossy_without_backpressure()
173 /// .no_replay()
174 /// .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
175 /// .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
176 /// })
177 /// .spawn()
178 /// .unwrap();
179 ///
180 /// // Write to stdin.
181 /// if let Some(stdin) = process.stdin().as_mut() {
182 /// stdin.write_all(b"Hello, process!\n").await.unwrap();
183 /// stdin.flush().await.unwrap();
184 /// }
185 ///
186 /// // Close stdin to signal EOF.
187 /// process.stdin().close();
188 /// # });
189 /// ```
190 pub fn stdin(&mut self) -> &mut Stdin {
191 &mut self.std_in
192 }
193
194 /// Returns a reference to the stdout stream.
195 ///
196 /// For [`BroadcastOutputStream`](crate::BroadcastOutputStream), this allows creating multiple
197 /// concurrent consumers. For
198 /// [`SingleSubscriberOutputStream`](crate::SingleSubscriberOutputStream), only one active
199 /// consumer can exist at a time; concurrent attempts return
200 /// [`StreamConsumerError::ActiveConsumer`](crate::StreamConsumerError::ActiveConsumer) from
201 /// `consume(...)` / `consume_async(...)` / `wait_for_line(...)` rather than panicking.
202 pub fn stdout(&self) -> &Stdout {
203 &self.std_out_stream
204 }
205
206 /// Returns a reference to the stderr stream.
207 ///
208 /// For [`BroadcastOutputStream`](crate::BroadcastOutputStream), this allows creating multiple
209 /// concurrent consumers. For
210 /// [`SingleSubscriberOutputStream`](crate::SingleSubscriberOutputStream), only one active
211 /// consumer can exist at a time; concurrent attempts return
212 /// [`StreamConsumerError::ActiveConsumer`](crate::StreamConsumerError::ActiveConsumer) from
213 /// `consume(...)` / `consume_async(...)` / `wait_for_line(...)` rather than panicking.
214 pub fn stderr(&self) -> &Stderr {
215 &self.std_err_stream
216 }
217}
218
219impl<Stdout, Stderr> ProcessHandle<Stdout, Stderr>
220where
221 Stdout: OutputStream,
222 Stderr: OutputStream,
223{
224 /// Forwards to [`ProcessGroup::send_kill`].
225 ///
226 /// Kept as a single named entry point so its callers in [`drop_guard`](super::drop_guard)
227 /// and [`termination`](super::termination) read consistently as "kill via the handle"
228 /// rather than reaching into the wrapped [`ProcessGroup`] directly.
229 pub(super) fn send_kill_signal(&mut self) -> io::Result<()> {
230 self.group.send_kill()
231 }
232
233 /// Returns the OS process ID if the process hasn't exited yet.
234 ///
235 /// Once this process has been polled to completion this will return None.
236 pub fn id(&self) -> Option<u32> {
237 self.group.id()
238 }
239
240 /// Reaps the child's exit status without affecting the drop-cleanup or panic guards.
241 ///
242 /// Callers that observe an exit status are responsible for calling
243 /// [`Self::must_not_be_terminated`] when they take ownership of the lifecycle: the wait,
244 /// terminate, and kill paths do this explicitly when they succeed. `is_running` deliberately
245 /// does not, so a status check never silently disarms the safeguards.
246 ///
247 /// Exposed publicly so callers can wire this as the preflight reaper to
248 /// [`Self::terminate_with_hooks`].
249 #[doc(hidden)]
250 pub fn try_reap_exit_status(&mut self) -> Result<Option<ExitStatus>, io::Error> {
251 self.group.try_wait()
252 }
253
254 /// Checks if the process is currently running.
255 ///
256 /// Returns [`RunningState::Running`] if the process is still running,
257 /// [`RunningState::Terminated`] if it has exited, or [`RunningState::Uncertain`]
258 /// if the state could not be determined. Each call re-runs the underlying
259 /// `try_wait`, so a transient probing failure observed once does not become permanent.
260 ///
261 /// This is a pure status query: it does not disarm the drop-cleanup or panic guards, even
262 /// when it observes that the process has exited. A handle whose status reads
263 /// [`RunningState::Terminated`] still panics on drop until one of the lifecycle methods has
264 /// closed it. Use [`Self::wait_for_completion`] (the staged builder), [`Self::terminate`],
265 /// or [`Self::kill`] to close the lifecycle through a successful terminal call, or call
266 /// [`Self::must_not_be_terminated`] explicitly to detach the handle without termination.
267 //noinspection RsSelfConvention
268 pub fn is_running(&mut self) -> RunningState {
269 match self.try_reap_exit_status() {
270 Ok(None) => RunningState::Running,
271 Ok(Some(exit_status)) => RunningState::Terminated(exit_status),
272 Err(err) => RunningState::Uncertain(err),
273 }
274 }
275}
276
277impl<Stdout, Stderr> ProcessHandle<Stdout, Stderr>
278where
279 Stdout: OutputStream,
280 Stderr: OutputStream,
281{
282 /// Consumes this handle to provide the wrapped `tokio::process::Child` instance, stdin handle,
283 /// and stdout/stderr output streams.
284 ///
285 /// The returned [`Child`] no longer owns its `stdin` field because this crate separates piped
286 /// stdin into [`Stdin`]. Keep the returned [`Stdin`] alive to keep the child's stdin pipe open.
287 /// Dropping [`Stdin::Open`] closes the pipe, so the child may observe EOF and exit or otherwise
288 /// change behavior.
289 pub fn into_inner(mut self) -> (Child, Stdin, Stdout, Stderr) {
290 self.must_not_be_terminated();
291 let mut this = ManuallyDrop::new(self);
292
293 // SAFETY: `this` is wrapped in `ManuallyDrop`, so moving out fields with `ptr::read` will
294 // not cause the original `ProcessHandle` destructor to run. We explicitly drop the fields
295 // not returned from this function exactly once before returning the owned parts. The
296 // `ProcessGroup` is consumed by `into_leader()` to produce the bare Tokio `Child`; on
297 // Windows this drops the Job Object handle, which only closes the kernel handle (no
298 // `JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE` is set) and does not kill the child.
299 unsafe {
300 let group = std::ptr::read(&raw const this.group);
301 let stdin = std::ptr::read(&raw const this.std_in);
302 let stdout = std::ptr::read(&raw const this.std_out_stream);
303 let stderr = std::ptr::read(&raw const this.std_err_stream);
304
305 std::ptr::drop_in_place(&raw mut this.name);
306 std::ptr::drop_in_place(&raw mut this.drop_mode);
307
308 (group.into_leader(), stdin, stdout, stderr)
309 }
310 }
311}
312
313#[cfg(test)]
314mod tests {
315 use super::*;
316 use assertr::prelude::*;
317
318 #[test]
319 fn closed_stdin_reports_closed() {
320 let mut stdin = Stdin::Closed;
321
322 assert_that!(stdin.is_open()).is_false();
323 assert_that!(stdin.as_mut()).is_none();
324
325 stdin.close();
326
327 assert_that!(stdin.is_open()).is_false();
328 assert_that!(stdin.as_mut()).is_none();
329 }
330}