tokio_process_tools/process_handle/mod.rs
1//! Spawned-process handle, stdin lifecycle, and process state queries.
2
3mod drop_guard;
4pub(crate) mod output_collection;
5mod replay;
6mod spawn;
7pub(crate) mod termination;
8mod wait;
9
10use crate::output_stream::OutputStream;
11use crate::panic_on_drop::PanicOnDrop;
12use crate::process_handle::termination::GracefulTimeouts;
13use std::borrow::Cow;
14use std::io;
15use std::mem::ManuallyDrop;
16use std::process::ExitStatus;
17use std::time::Duration;
18use tokio::process::Child;
19use tokio::process::ChildStdin;
20
21/// Options for waiting until a process exits, terminating it if waiting fails.
22///
23/// `graceful_timeouts` is a [`GracefulTimeouts`] value, whose shape itself differs per platform
24/// (two timeouts on Unix, one on Windows). Cross-platform callers construct it under cfg gates;
25/// the wait-or-terminate signature stays the same on every supported OS.
26///
27/// This type is only available on Unix and Windows because the underlying `terminate(...)`
28/// machinery is gated to those platforms.
29#[cfg(any(unix, windows))]
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub struct WaitForCompletionOrTerminateOptions {
32 /// Maximum time to wait before attempting termination.
33 pub wait_timeout: Duration,
34
35 /// Per-platform graceful-shutdown timeout budget. See [`GracefulTimeouts`].
36 pub graceful_timeouts: GracefulTimeouts,
37}
38
39/// Represents the `stdin` stream of a child process.
40///
41/// stdin is always configured as piped, so it starts as `Open` with a [`ChildStdin`] handle
42/// that can be used to write data to the process. It can be explicitly closed by calling
43/// [`Stdin::close`], or implicitly closed by terminal wait helpers such as
44/// [`ProcessHandle::wait_for_completion`] and [`ProcessHandle::kill`], after which it transitions
45/// to the `Closed` state. Note that some operations (kill) close `stdin` early in order to match
46/// the behavior of tokio.
47#[derive(Debug)]
48pub enum Stdin {
49 /// `stdin` is open and available for writing.
50 Open(ChildStdin),
51
52 /// `stdin` has been closed.
53 Closed,
54}
55
56impl Stdin {
57 /// Returns `true` if stdin is open and available for writing.
58 #[must_use]
59 pub fn is_open(&self) -> bool {
60 matches!(self, Stdin::Open(_))
61 }
62
63 /// Returns a mutable reference to the underlying [`ChildStdin`] if open, or `None` if closed.
64 pub fn as_mut(&mut self) -> Option<&mut ChildStdin> {
65 match self {
66 Stdin::Open(stdin) => Some(stdin),
67 Stdin::Closed => None,
68 }
69 }
70
71 /// Closes stdin by dropping the underlying [`ChildStdin`] handle.
72 ///
73 /// This sends `EOF` to the child process. After calling this method, this stdin
74 /// will be in the `Closed` state and no further writes will be possible. Terminal wait helpers
75 /// such as [`ProcessHandle::wait_for_completion`] and [`ProcessHandle::kill`] also close any
76 /// still-open stdin automatically; call this method when the child must observe EOF earlier.
77 pub fn close(&mut self) {
78 *self = Stdin::Closed;
79 }
80}
81
82/// Represents the running state of a process.
83#[derive(Debug)]
84pub enum RunningState {
85 /// The process is still running.
86 Running,
87
88 /// The process has terminated with the given exit status.
89 Terminated(ExitStatus),
90
91 /// Reaping the child failed, so the actual state could not be observed.
92 ///
93 /// The conservative reading is "still running": until termination has been confirmed, the
94 /// drop-cleanup and panic guards on the handle are still required, and the process may yet
95 /// produce output. The variant carries the underlying [`io::Error`] so callers can log or
96 /// retry at their discretion; treating it as "not running" risks dropping a still-live child.
97 /// [`RunningState::is_definitely_running`] returns `false` here on purpose so callers cannot
98 /// mistake "we don't know" for "still running" via a single boolean check, but matching on
99 /// the enum is the only way to distinguish [`RunningState::Terminated`] from this case.
100 Uncertain(io::Error),
101}
102
103impl RunningState {
104 /// Returns `true` only when the state is [`RunningState::Running`].
105 ///
106 /// Both [`RunningState::Terminated`] and [`RunningState::Uncertain`] return `false`. The
107 /// asymmetry is intentional: callers who need to distinguish "not running" from "we don't
108 /// know" should match the enum directly. The error carried by
109 /// [`RunningState::Uncertain`] is real and worth surfacing, not silently collapsing.
110 ///
111 /// Even when this returns `false` for an [`RunningState::Uncertain`] state, the safe
112 /// interpretation is still "the process may be running"; do not use this predicate to decide
113 /// whether it is safe to drop the handle. See [`RunningState::Uncertain`].
114 #[must_use]
115 pub fn is_definitely_running(&self) -> bool {
116 matches!(self, RunningState::Running)
117 }
118}
119
120/// Drop-time behavior selected by the lifecycle methods on [`ProcessHandle`].
121///
122/// The state machine has two reachable states because every public lifecycle entry point either
123/// keeps both safeguards on (`Armed`) or turns both off (`Disarmed`). There is no "panic only,
124/// no cleanup" or "cleanup only, no panic" combination: the panic guard makes sense only when
125/// paired with the kill that signals the misuse.
126#[derive(Debug)]
127pub(super) enum DropMode {
128 /// Cleanup is attempted on drop and the panic guard fires when it does.
129 Armed { panic: PanicOnDrop },
130 /// Both cleanup and the panic guard are off. Drop is a no-op for this handle's lifecycle.
131 Disarmed,
132}
133
134/// A handle to a spawned process with captured stdout/stderr streams.
135///
136/// This type provides methods for waiting on process completion, terminating the process,
137/// and accessing its output streams. By default, processes must be explicitly waited on
138/// or terminated before being dropped (see [`ProcessHandle::must_be_terminated`]).
139///
140/// If applicable, a process handle can be wrapped in a [`crate::TerminateOnDrop`] to be terminated
141/// automatically upon being dropped. Note that this requires a multi-threaded runtime!
142#[derive(Debug)]
143pub struct ProcessHandle<Stdout, Stderr = Stdout>
144where
145 Stdout: OutputStream,
146 Stderr: OutputStream,
147{
148 pub(crate) name: Cow<'static, str>,
149 child: Child,
150 std_in: Stdin,
151 std_out_stream: Stdout,
152 std_err_stream: Stderr,
153 pub(super) drop_mode: DropMode,
154}
155
156impl<Stdout, Stderr> ProcessHandle<Stdout, Stderr>
157where
158 Stdout: OutputStream,
159 Stderr: OutputStream,
160{
161 /// Returns a mutable reference to the (potentially already closed) stdin stream.
162 ///
163 /// Use this to write data to the child process's stdin. The stdin stream implements
164 /// [`tokio::io::AsyncWrite`], allowing you to use methods like `write_all()` and `flush()`.
165 ///
166 /// # Example
167 ///
168 /// ```no_run
169 /// # use tokio::process::Command;
170 /// # use tokio_process_tools::{
171 /// AutoName, DEFAULT_MAX_BUFFERED_CHUNKS, DEFAULT_READ_CHUNK_SIZE, Process,
172 /// };
173 /// # use tokio::io::AsyncWriteExt;
174 /// # tokio_test::block_on(async {
175 /// // The stream backend does not make a difference here.
176 /// let mut process = Process::new(Command::new("cat"))
177 /// .name(AutoName::program_only())
178 /// .stdout_and_stderr(|stream| {
179 /// stream
180 /// .broadcast()
181 /// .best_effort_delivery()
182 /// .no_replay()
183 /// .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
184 /// .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
185 /// })
186 /// .spawn()
187 /// .unwrap();
188 ///
189 /// // Write to stdin.
190 /// if let Some(stdin) = process.stdin().as_mut() {
191 /// stdin.write_all(b"Hello, process!\n").await.unwrap();
192 /// stdin.flush().await.unwrap();
193 /// }
194 ///
195 /// // Close stdin to signal EOF.
196 /// process.stdin().close();
197 /// # });
198 /// ```
199 pub fn stdin(&mut self) -> &mut Stdin {
200 &mut self.std_in
201 }
202
203 /// Returns a reference to the stdout stream.
204 ///
205 /// For `BroadcastOutputStream`, this allows creating multiple concurrent consumers.
206 /// For `SingleSubscriberOutputStream`, only one active consumer can exist at a time
207 /// (concurrent attempts will panic with a helpful error message).
208 pub fn stdout(&self) -> &Stdout {
209 &self.std_out_stream
210 }
211
212 /// Returns a reference to the stderr stream.
213 ///
214 /// For `BroadcastOutputStream`, this allows creating multiple concurrent consumers.
215 /// For `SingleSubscriberOutputStream`, only one active consumer can exist at a time
216 /// (concurrent attempts will panic with a helpful error message).
217 pub fn stderr(&self) -> &Stderr {
218 &self.std_err_stream
219 }
220}
221
222impl<Stdout, Stderr> ProcessHandle<Stdout, Stderr>
223where
224 Stdout: OutputStream,
225 Stderr: OutputStream,
226{
227 /// Returns the OS process ID if the process hasn't exited yet.
228 ///
229 /// Once this process has been polled to completion this will return None.
230 pub fn id(&self) -> Option<u32> {
231 self.child.id()
232 }
233
234 /// Reaps the child's exit status without affecting the drop-cleanup or panic guards.
235 ///
236 /// Callers that observe an exit status are responsible for calling
237 /// [`Self::must_not_be_terminated`] when they take ownership of the lifecycle: the wait,
238 /// terminate, and kill paths do this explicitly when they succeed. `is_running` deliberately
239 /// does not, so a status check never silently disarms the safeguards.
240 pub(super) fn try_reap_exit_status(&mut self) -> Result<Option<ExitStatus>, io::Error> {
241 self.child.try_wait()
242 }
243
244 /// Checks if the process is currently running.
245 ///
246 /// Returns [`RunningState::Running`] if the process is still running,
247 /// [`RunningState::Terminated`] if it has exited, or [`RunningState::Uncertain`]
248 /// if the state could not be determined. Each call re-runs the underlying
249 /// `try_wait`, so a transient probing failure observed once does not become permanent.
250 ///
251 /// This is a pure status query: it does not disarm the drop-cleanup or panic guards, even
252 /// when it observes that the process has exited. A handle whose status reads
253 /// [`RunningState::Terminated`] still panics on drop until one of the lifecycle methods has
254 /// closed it. Use [`Self::wait_for_completion`], [`Self::terminate`], or [`Self::kill`] to
255 /// close the lifecycle through a successful terminal call, or call
256 /// [`Self::must_not_be_terminated`] explicitly to detach the handle without termination.
257 //noinspection RsSelfConvention
258 pub fn is_running(&mut self) -> RunningState {
259 match self.try_reap_exit_status() {
260 Ok(None) => RunningState::Running,
261 Ok(Some(exit_status)) => RunningState::Terminated(exit_status),
262 Err(err) => RunningState::Uncertain(err),
263 }
264 }
265}
266
267impl<Stdout, Stderr> ProcessHandle<Stdout, Stderr>
268where
269 Stdout: OutputStream,
270 Stderr: OutputStream,
271{
272 /// Consumes this handle to provide the wrapped `tokio::process::Child` instance, stdin handle,
273 /// and stdout/stderr output streams.
274 ///
275 /// The returned [`Child`] no longer owns its `stdin` field because this crate separates piped
276 /// stdin into [`Stdin`]. Keep the returned [`Stdin`] alive to keep the child's stdin pipe open.
277 /// Dropping [`Stdin::Open`] closes the pipe, so the child may observe EOF and exit or otherwise
278 /// change behavior.
279 pub fn into_inner(mut self) -> (Child, Stdin, Stdout, Stderr) {
280 self.must_not_be_terminated();
281 let mut this = ManuallyDrop::new(self);
282
283 // SAFETY: `this` is wrapped in `ManuallyDrop`, so moving out fields with `ptr::read` will
284 // not cause the original `ProcessHandle` destructor to run. We explicitly drop the fields
285 // not returned from this function exactly once before returning the owned parts.
286 unsafe {
287 let child = std::ptr::read(&raw const this.child);
288 let stdin = std::ptr::read(&raw const this.std_in);
289 let stdout = std::ptr::read(&raw const this.std_out_stream);
290 let stderr = std::ptr::read(&raw const this.std_err_stream);
291
292 std::ptr::drop_in_place(&raw mut this.name);
293 std::ptr::drop_in_place(&raw mut this.drop_mode);
294
295 (child, stdin, stdout, stderr)
296 }
297 }
298}
299
300#[cfg(test)]
301mod tests {
302 use super::*;
303 use crate::{
304 AutoName, DEFAULT_MAX_BUFFERED_CHUNKS, DEFAULT_READ_CHUNK_SIZE, Process, RunningState,
305 test_support::long_running_command,
306 };
307 use assertr::prelude::*;
308 use std::process::Stdio;
309 use std::time::Duration;
310
311 mod stdin {
312 use super::*;
313
314 #[tokio::test]
315 async fn open_stdin_reports_open_until_closed() {
316 let mut child = long_running_command(Duration::from_secs(5))
317 .stdin(Stdio::piped())
318 .spawn()
319 .unwrap();
320 let child_stdin = child.stdin.take().unwrap();
321 let mut stdin = Stdin::Open(child_stdin);
322
323 assert_that!(stdin.is_open()).is_true();
324 assert_that!(stdin.as_mut().is_some()).is_true();
325
326 stdin.close();
327
328 assert_that!(stdin.is_open()).is_false();
329 assert_that!(stdin.as_mut()).is_none();
330
331 child.kill().await.unwrap();
332 }
333
334 #[test]
335 fn closed_stdin_reports_closed() {
336 let mut stdin = Stdin::Closed;
337
338 assert_that!(stdin.is_open()).is_false();
339 assert_that!(stdin.as_mut()).is_none();
340
341 stdin.close();
342
343 assert_that!(stdin.is_open()).is_false();
344 assert_that!(stdin.as_mut()).is_none();
345 }
346 }
347
348 mod is_running {
349 use super::*;
350
351 #[tokio::test]
352 async fn does_not_disarm_drop_guards_when_process_has_exited() {
353 let mut process = Process::new(long_running_command(Duration::from_millis(50)))
354 .name(AutoName::program_only())
355 .stdout_and_stderr(|stream| {
356 stream
357 .broadcast()
358 .best_effort_delivery()
359 .no_replay()
360 .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
361 .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
362 })
363 .spawn()
364 .unwrap();
365
366 tokio::time::sleep(Duration::from_millis(200)).await;
367
368 // Observe the exit through the query API. Even with the child reaped, the drop
369 // guards must remain armed because is_running is documented as a pure status query.
370 let _state = process.is_running();
371
372 assert_that!(process.is_drop_armed()).is_true();
373
374 // Detach explicitly so the test does not panic when `process` drops.
375 process.must_not_be_terminated();
376 }
377
378 #[tokio::test]
379 async fn reports_running_before_wait_and_terminated_after_wait() {
380 let mut process = Process::new(long_running_command(Duration::from_secs(1)))
381 .name(AutoName::program_only())
382 .stdout_and_stderr(|stream| {
383 stream
384 .broadcast()
385 .best_effort_delivery()
386 .no_replay()
387 .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
388 .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
389 })
390 .spawn()
391 .unwrap();
392
393 match process.is_running() {
394 RunningState::Running => {}
395 RunningState::Terminated(exit_status) => {
396 assert_that!(exit_status).fail("process should still be running");
397 }
398 RunningState::Uncertain(_) => {
399 assert_that!(&process).fail("process state should not be uncertain");
400 }
401 }
402
403 process
404 .wait_for_completion(Duration::from_secs(2))
405 .await
406 .unwrap();
407
408 match process.is_running() {
409 RunningState::Running => {
410 assert_that!(process).fail("process should not be running anymore");
411 }
412 RunningState::Terminated(exit_status) => {
413 assert_that!(exit_status.code()).is_some().is_equal_to(0);
414 assert_that!(exit_status.success()).is_true();
415 }
416 RunningState::Uncertain(_) => {
417 assert_that!(process).fail("process state should not be uncertain");
418 }
419 }
420 }
421 }
422
423 #[cfg(test)]
424 mod into_inner {
425 use super::*;
426 use crate::LineParsingOptions;
427 use crate::test_support::line_collection_options;
428 use tokio::io::AsyncWriteExt;
429
430 #[tokio::test]
431 async fn returns_stdin_with_pipe_still_open() {
432 let cmd = tokio::process::Command::new("cat");
433 let process = Process::new(cmd)
434 .name("cat")
435 .stdout_and_stderr(|stream| {
436 stream
437 .broadcast()
438 .best_effort_delivery()
439 .no_replay()
440 .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
441 .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
442 })
443 .spawn()
444 .unwrap();
445
446 let (mut child, mut stdin, stdout, _stderr) = process.into_inner();
447 assert_that!(child.stdin.is_none()).is_true();
448
449 tokio::time::sleep(Duration::from_millis(100)).await;
450 assert_that!(child.try_wait().unwrap().is_none()).is_true();
451
452 let collector = stdout
453 .collect_lines_into_vec(LineParsingOptions::default(), line_collection_options());
454
455 let Some(stdin_handle) = stdin.as_mut() else {
456 assert_that!(stdin.is_open()).fail("stdin should be returned open");
457 return;
458 };
459 stdin_handle
460 .write_all(b"stdin stayed open\n")
461 .await
462 .unwrap();
463 stdin_handle.flush().await.unwrap();
464
465 stdin.close();
466
467 let status = tokio::time::timeout(Duration::from_secs(2), child.wait())
468 .await
469 .unwrap()
470 .unwrap();
471 assert_that!(status.success()).is_true();
472
473 let collected = collector.wait().await.unwrap();
474 assert_that!(collected.lines().len()).is_equal_to(1);
475 assert_that!(collected[0].as_str()).is_equal_to("stdin stayed open");
476 }
477
478 #[tokio::test]
479 async fn defuses_panic_guard() {
480 let process = Process::new(long_running_command(Duration::from_secs(5)))
481 .name("long-running")
482 .stdout_and_stderr(|stream| {
483 stream
484 .broadcast()
485 .best_effort_delivery()
486 .no_replay()
487 .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
488 .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
489 })
490 .spawn()
491 .unwrap();
492
493 let (mut child, _stdin, _stdout, _stderr) = process.into_inner();
494 child.kill().await.unwrap();
495 let _status = child.wait().await.unwrap();
496 }
497
498 #[tokio::test]
499 async fn supports_handles_built_with_owned_name() {
500 let process = Process::new(long_running_command(Duration::from_secs(5)))
501 .name(format!("sleeper-{}", 7))
502 .stdout_and_stderr(|stream| {
503 stream
504 .broadcast()
505 .best_effort_delivery()
506 .no_replay()
507 .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
508 .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
509 })
510 .spawn()
511 .unwrap();
512
513 let (mut child, _stdin, _stdout, _stderr) = process.into_inner();
514 child.kill().await.unwrap();
515 let _status = child.wait().await.unwrap();
516 }
517 }
518}