Skip to main content

tokio_process_tools/process_handle/
mod.rs

1//! Spawned-process handle, stdin lifecycle, and process state queries.
2
3mod drop_guard;
4pub(crate) mod output_collection;
5mod replay;
6mod spawn;
7pub(crate) mod termination;
8mod wait;
9
10use crate::output_stream::OutputStream;
11use crate::panic_on_drop::PanicOnDrop;
12use crate::process_handle::termination::GracefulTimeouts;
13use std::borrow::Cow;
14use std::io;
15use std::mem::ManuallyDrop;
16use std::process::ExitStatus;
17use std::time::Duration;
18use tokio::process::Child;
19use tokio::process::ChildStdin;
20
21/// Options for waiting until a process exits, terminating it if waiting fails.
22///
23/// `graceful_timeouts` is a [`GracefulTimeouts`] value, whose shape itself differs per platform
24/// (two timeouts on Unix, one on Windows). Cross-platform callers construct it under cfg gates;
25/// the wait-or-terminate signature stays the same on every supported OS.
26///
27/// This type is only available on Unix and Windows because the underlying `terminate(...)`
28/// machinery is gated to those platforms.
29#[cfg(any(unix, windows))]
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub struct WaitForCompletionOrTerminateOptions {
32    /// Maximum time to wait before attempting termination.
33    pub wait_timeout: Duration,
34
35    /// Per-platform graceful-shutdown timeout budget. See [`GracefulTimeouts`].
36    pub graceful_timeouts: GracefulTimeouts,
37}
38
39/// Represents the `stdin` stream of a child process.
40///
41/// stdin is always configured as piped, so it starts as `Open` with a [`ChildStdin`] handle
42/// that can be used to write data to the process. It can be explicitly closed by calling
43/// [`Stdin::close`], or implicitly closed by terminal wait helpers such as
44/// [`ProcessHandle::wait_for_completion`] and [`ProcessHandle::kill`], after which it transitions
45/// to the `Closed` state. Note that some operations (kill) close `stdin` early in order to match
46/// the behavior of tokio.
47#[derive(Debug)]
48pub enum Stdin {
49    /// `stdin` is open and available for writing.
50    Open(ChildStdin),
51
52    /// `stdin` has been closed.
53    Closed,
54}
55
56impl Stdin {
57    /// Returns `true` if stdin is open and available for writing.
58    #[must_use]
59    pub fn is_open(&self) -> bool {
60        matches!(self, Stdin::Open(_))
61    }
62
63    /// Returns a mutable reference to the underlying [`ChildStdin`] if open, or `None` if closed.
64    pub fn as_mut(&mut self) -> Option<&mut ChildStdin> {
65        match self {
66            Stdin::Open(stdin) => Some(stdin),
67            Stdin::Closed => None,
68        }
69    }
70
71    /// Closes stdin by dropping the underlying [`ChildStdin`] handle.
72    ///
73    /// This sends `EOF` to the child process. After calling this method, this stdin
74    /// will be in the `Closed` state and no further writes will be possible. Terminal wait helpers
75    /// such as [`ProcessHandle::wait_for_completion`] and [`ProcessHandle::kill`] also close any
76    /// still-open stdin automatically; call this method when the child must observe EOF earlier.
77    pub fn close(&mut self) {
78        *self = Stdin::Closed;
79    }
80}
81
82/// Represents the running state of a process.
83#[derive(Debug)]
84pub enum RunningState {
85    /// The process is still running.
86    Running,
87
88    /// The process has terminated with the given exit status.
89    Terminated(ExitStatus),
90
91    /// Reaping the child failed, so the actual state could not be observed.
92    ///
93    /// The conservative reading is "still running": until termination has been confirmed, the
94    /// drop-cleanup and panic guards on the handle are still required, and the process may yet
95    /// produce output. The variant carries the underlying [`io::Error`] so callers can log or
96    /// retry at their discretion; treating it as "not running" risks dropping a still-live child.
97    /// [`RunningState::is_definitely_running`] returns `false` here on purpose so callers cannot
98    /// mistake "we don't know" for "still running" via a single boolean check, but matching on
99    /// the enum is the only way to distinguish [`RunningState::Terminated`] from this case.
100    Uncertain(io::Error),
101}
102
103impl RunningState {
104    /// Returns `true` only when the state is [`RunningState::Running`].
105    ///
106    /// Both [`RunningState::Terminated`] and [`RunningState::Uncertain`] return `false`. The
107    /// asymmetry is intentional: callers who need to distinguish "not running" from "we don't
108    /// know" should match the enum directly. The error carried by
109    /// [`RunningState::Uncertain`] is real and worth surfacing, not silently collapsing.
110    ///
111    /// Even when this returns `false` for an [`RunningState::Uncertain`] state, the safe
112    /// interpretation is still "the process may be running"; do not use this predicate to decide
113    /// whether it is safe to drop the handle. See [`RunningState::Uncertain`].
114    #[must_use]
115    pub fn is_definitely_running(&self) -> bool {
116        matches!(self, RunningState::Running)
117    }
118}
119
120/// Drop-time behavior selected by the lifecycle methods on [`ProcessHandle`].
121///
122/// The state machine has two reachable states because every public lifecycle entry point either
123/// keeps both safeguards on (`Armed`) or turns both off (`Disarmed`). There is no "panic only,
124/// no cleanup" or "cleanup only, no panic" combination: the panic guard makes sense only when
125/// paired with the kill that signals the misuse.
126#[derive(Debug)]
127pub(super) enum DropMode {
128    /// Cleanup is attempted on drop and the panic guard fires when it does.
129    Armed { panic: PanicOnDrop },
130    /// Both cleanup and the panic guard are off. Drop is a no-op for this handle's lifecycle.
131    Disarmed,
132}
133
134/// A handle to a spawned process with captured stdout/stderr streams.
135///
136/// This type provides methods for waiting on process completion, terminating the process,
137/// and accessing its output streams. By default, processes must be explicitly waited on
138/// or terminated before being dropped (see [`ProcessHandle::must_be_terminated`]).
139///
140/// If applicable, a process handle can be wrapped in a [`crate::TerminateOnDrop`] to be terminated
141/// automatically upon being dropped. Note that this requires a multi-threaded runtime!
142#[derive(Debug)]
143pub struct ProcessHandle<Stdout, Stderr = Stdout>
144where
145    Stdout: OutputStream,
146    Stderr: OutputStream,
147{
148    pub(crate) name: Cow<'static, str>,
149    child: Child,
150    std_in: Stdin,
151    std_out_stream: Stdout,
152    std_err_stream: Stderr,
153    pub(super) drop_mode: DropMode,
154}
155
156impl<Stdout, Stderr> ProcessHandle<Stdout, Stderr>
157where
158    Stdout: OutputStream,
159    Stderr: OutputStream,
160{
161    /// Returns a mutable reference to the (potentially already closed) stdin stream.
162    ///
163    /// Use this to write data to the child process's stdin. The stdin stream implements
164    /// [`tokio::io::AsyncWrite`], allowing you to use methods like `write_all()` and `flush()`.
165    ///
166    /// # Example
167    ///
168    /// ```no_run
169    /// # use tokio::process::Command;
170    /// # use tokio_process_tools::{
171    ///     AutoName, DEFAULT_MAX_BUFFERED_CHUNKS, DEFAULT_READ_CHUNK_SIZE, Process,
172    /// };
173    /// # use tokio::io::AsyncWriteExt;
174    /// # tokio_test::block_on(async {
175    /// // The stream backend does not make a difference here.
176    /// let mut process = Process::new(Command::new("cat"))
177    ///     .name(AutoName::program_only())
178    ///     .stdout_and_stderr(|stream| {
179    ///         stream
180    ///             .broadcast()
181    ///             .best_effort_delivery()
182    ///             .no_replay()
183    ///             .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
184    ///             .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
185    ///     })
186    ///     .spawn()
187    ///     .unwrap();
188    ///
189    /// // Write to stdin.
190    /// if let Some(stdin) = process.stdin().as_mut() {
191    ///     stdin.write_all(b"Hello, process!\n").await.unwrap();
192    ///     stdin.flush().await.unwrap();
193    /// }
194    ///
195    /// // Close stdin to signal EOF.
196    /// process.stdin().close();
197    /// # });
198    /// ```
199    pub fn stdin(&mut self) -> &mut Stdin {
200        &mut self.std_in
201    }
202
203    /// Returns a reference to the stdout stream.
204    ///
205    /// For `BroadcastOutputStream`, this allows creating multiple concurrent consumers.
206    /// For `SingleSubscriberOutputStream`, only one active consumer can exist at a time
207    /// (concurrent attempts will panic with a helpful error message).
208    pub fn stdout(&self) -> &Stdout {
209        &self.std_out_stream
210    }
211
212    /// Returns a reference to the stderr stream.
213    ///
214    /// For `BroadcastOutputStream`, this allows creating multiple concurrent consumers.
215    /// For `SingleSubscriberOutputStream`, only one active consumer can exist at a time
216    /// (concurrent attempts will panic with a helpful error message).
217    pub fn stderr(&self) -> &Stderr {
218        &self.std_err_stream
219    }
220}
221
222impl<Stdout, Stderr> ProcessHandle<Stdout, Stderr>
223where
224    Stdout: OutputStream,
225    Stderr: OutputStream,
226{
227    /// Returns the OS process ID if the process hasn't exited yet.
228    ///
229    /// Once this process has been polled to completion this will return None.
230    pub fn id(&self) -> Option<u32> {
231        self.child.id()
232    }
233
234    /// Reaps the child's exit status without affecting the drop-cleanup or panic guards.
235    ///
236    /// Callers that observe an exit status are responsible for calling
237    /// [`Self::must_not_be_terminated`] when they take ownership of the lifecycle: the wait,
238    /// terminate, and kill paths do this explicitly when they succeed. `is_running` deliberately
239    /// does not, so a status check never silently disarms the safeguards.
240    pub(super) fn try_reap_exit_status(&mut self) -> Result<Option<ExitStatus>, io::Error> {
241        self.child.try_wait()
242    }
243
244    /// Checks if the process is currently running.
245    ///
246    /// Returns [`RunningState::Running`] if the process is still running,
247    /// [`RunningState::Terminated`] if it has exited, or [`RunningState::Uncertain`]
248    /// if the state could not be determined. Each call re-runs the underlying
249    /// `try_wait`, so a transient probing failure observed once does not become permanent.
250    ///
251    /// This is a pure status query: it does not disarm the drop-cleanup or panic guards, even
252    /// when it observes that the process has exited. A handle whose status reads
253    /// [`RunningState::Terminated`] still panics on drop until one of the lifecycle methods has
254    /// closed it. Use [`Self::wait_for_completion`], [`Self::terminate`], or [`Self::kill`] to
255    /// close the lifecycle through a successful terminal call, or call
256    /// [`Self::must_not_be_terminated`] explicitly to detach the handle without termination.
257    //noinspection RsSelfConvention
258    pub fn is_running(&mut self) -> RunningState {
259        match self.try_reap_exit_status() {
260            Ok(None) => RunningState::Running,
261            Ok(Some(exit_status)) => RunningState::Terminated(exit_status),
262            Err(err) => RunningState::Uncertain(err),
263        }
264    }
265}
266
267impl<Stdout, Stderr> ProcessHandle<Stdout, Stderr>
268where
269    Stdout: OutputStream,
270    Stderr: OutputStream,
271{
272    /// Consumes this handle to provide the wrapped `tokio::process::Child` instance, stdin handle,
273    /// and stdout/stderr output streams.
274    ///
275    /// The returned [`Child`] no longer owns its `stdin` field because this crate separates piped
276    /// stdin into [`Stdin`]. Keep the returned [`Stdin`] alive to keep the child's stdin pipe open.
277    /// Dropping [`Stdin::Open`] closes the pipe, so the child may observe EOF and exit or otherwise
278    /// change behavior.
279    pub fn into_inner(mut self) -> (Child, Stdin, Stdout, Stderr) {
280        self.must_not_be_terminated();
281        let mut this = ManuallyDrop::new(self);
282
283        // SAFETY: `this` is wrapped in `ManuallyDrop`, so moving out fields with `ptr::read` will
284        // not cause the original `ProcessHandle` destructor to run. We explicitly drop the fields
285        // not returned from this function exactly once before returning the owned parts.
286        unsafe {
287            let child = std::ptr::read(&raw const this.child);
288            let stdin = std::ptr::read(&raw const this.std_in);
289            let stdout = std::ptr::read(&raw const this.std_out_stream);
290            let stderr = std::ptr::read(&raw const this.std_err_stream);
291
292            std::ptr::drop_in_place(&raw mut this.name);
293            std::ptr::drop_in_place(&raw mut this.drop_mode);
294
295            (child, stdin, stdout, stderr)
296        }
297    }
298}
299
300#[cfg(test)]
301mod tests {
302    use super::*;
303    use crate::{
304        AutoName, DEFAULT_MAX_BUFFERED_CHUNKS, DEFAULT_READ_CHUNK_SIZE, Process, RunningState,
305        test_support::long_running_command,
306    };
307    use assertr::prelude::*;
308    use std::process::Stdio;
309    use std::time::Duration;
310
311    mod stdin {
312        use super::*;
313
314        #[tokio::test]
315        async fn open_stdin_reports_open_until_closed() {
316            let mut child = long_running_command(Duration::from_secs(5))
317                .stdin(Stdio::piped())
318                .spawn()
319                .unwrap();
320            let child_stdin = child.stdin.take().unwrap();
321            let mut stdin = Stdin::Open(child_stdin);
322
323            assert_that!(stdin.is_open()).is_true();
324            assert_that!(stdin.as_mut().is_some()).is_true();
325
326            stdin.close();
327
328            assert_that!(stdin.is_open()).is_false();
329            assert_that!(stdin.as_mut()).is_none();
330
331            child.kill().await.unwrap();
332        }
333
334        #[test]
335        fn closed_stdin_reports_closed() {
336            let mut stdin = Stdin::Closed;
337
338            assert_that!(stdin.is_open()).is_false();
339            assert_that!(stdin.as_mut()).is_none();
340
341            stdin.close();
342
343            assert_that!(stdin.is_open()).is_false();
344            assert_that!(stdin.as_mut()).is_none();
345        }
346    }
347
348    mod is_running {
349        use super::*;
350
351        #[tokio::test]
352        async fn does_not_disarm_drop_guards_when_process_has_exited() {
353            let mut process = Process::new(long_running_command(Duration::from_millis(50)))
354                .name(AutoName::program_only())
355                .stdout_and_stderr(|stream| {
356                    stream
357                        .broadcast()
358                        .best_effort_delivery()
359                        .no_replay()
360                        .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
361                        .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
362                })
363                .spawn()
364                .unwrap();
365
366            tokio::time::sleep(Duration::from_millis(200)).await;
367
368            // Observe the exit through the query API. Even with the child reaped, the drop
369            // guards must remain armed because is_running is documented as a pure status query.
370            let _state = process.is_running();
371
372            assert_that!(process.is_drop_armed()).is_true();
373
374            // Detach explicitly so the test does not panic when `process` drops.
375            process.must_not_be_terminated();
376        }
377
378        #[tokio::test]
379        async fn reports_running_before_wait_and_terminated_after_wait() {
380            let mut process = Process::new(long_running_command(Duration::from_secs(1)))
381                .name(AutoName::program_only())
382                .stdout_and_stderr(|stream| {
383                    stream
384                        .broadcast()
385                        .best_effort_delivery()
386                        .no_replay()
387                        .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
388                        .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
389                })
390                .spawn()
391                .unwrap();
392
393            match process.is_running() {
394                RunningState::Running => {}
395                RunningState::Terminated(exit_status) => {
396                    assert_that!(exit_status).fail("process should still be running");
397                }
398                RunningState::Uncertain(_) => {
399                    assert_that!(&process).fail("process state should not be uncertain");
400                }
401            }
402
403            process
404                .wait_for_completion(Duration::from_secs(2))
405                .await
406                .unwrap();
407
408            match process.is_running() {
409                RunningState::Running => {
410                    assert_that!(process).fail("process should not be running anymore");
411                }
412                RunningState::Terminated(exit_status) => {
413                    assert_that!(exit_status.code()).is_some().is_equal_to(0);
414                    assert_that!(exit_status.success()).is_true();
415                }
416                RunningState::Uncertain(_) => {
417                    assert_that!(process).fail("process state should not be uncertain");
418                }
419            }
420        }
421    }
422
423    #[cfg(test)]
424    mod into_inner {
425        use super::*;
426        use crate::LineParsingOptions;
427        use crate::test_support::line_collection_options;
428        use tokio::io::AsyncWriteExt;
429
430        #[tokio::test]
431        async fn returns_stdin_with_pipe_still_open() {
432            let cmd = tokio::process::Command::new("cat");
433            let process = Process::new(cmd)
434                .name("cat")
435                .stdout_and_stderr(|stream| {
436                    stream
437                        .broadcast()
438                        .best_effort_delivery()
439                        .no_replay()
440                        .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
441                        .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
442                })
443                .spawn()
444                .unwrap();
445
446            let (mut child, mut stdin, stdout, _stderr) = process.into_inner();
447            assert_that!(child.stdin.is_none()).is_true();
448
449            tokio::time::sleep(Duration::from_millis(100)).await;
450            assert_that!(child.try_wait().unwrap().is_none()).is_true();
451
452            let collector = stdout
453                .collect_lines_into_vec(LineParsingOptions::default(), line_collection_options());
454
455            let Some(stdin_handle) = stdin.as_mut() else {
456                assert_that!(stdin.is_open()).fail("stdin should be returned open");
457                return;
458            };
459            stdin_handle
460                .write_all(b"stdin stayed open\n")
461                .await
462                .unwrap();
463            stdin_handle.flush().await.unwrap();
464
465            stdin.close();
466
467            let status = tokio::time::timeout(Duration::from_secs(2), child.wait())
468                .await
469                .unwrap()
470                .unwrap();
471            assert_that!(status.success()).is_true();
472
473            let collected = collector.wait().await.unwrap();
474            assert_that!(collected.lines().len()).is_equal_to(1);
475            assert_that!(collected[0].as_str()).is_equal_to("stdin stayed open");
476        }
477
478        #[tokio::test]
479        async fn defuses_panic_guard() {
480            let process = Process::new(long_running_command(Duration::from_secs(5)))
481                .name("long-running")
482                .stdout_and_stderr(|stream| {
483                    stream
484                        .broadcast()
485                        .best_effort_delivery()
486                        .no_replay()
487                        .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
488                        .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
489                })
490                .spawn()
491                .unwrap();
492
493            let (mut child, _stdin, _stdout, _stderr) = process.into_inner();
494            child.kill().await.unwrap();
495            let _status = child.wait().await.unwrap();
496        }
497
498        #[tokio::test]
499        async fn supports_handles_built_with_owned_name() {
500            let process = Process::new(long_running_command(Duration::from_secs(5)))
501                .name(format!("sleeper-{}", 7))
502                .stdout_and_stderr(|stream| {
503                    stream
504                        .broadcast()
505                        .best_effort_delivery()
506                        .no_replay()
507                        .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
508                        .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
509                })
510                .spawn()
511                .unwrap();
512
513            let (mut child, _stdin, _stdout, _stderr) = process.into_inner();
514            child.kill().await.unwrap();
515            let _status = child.wait().await.unwrap();
516        }
517    }
518}