Skip to main content

tokio_process_tools/process_handle/
mod.rs

1//! Spawned-process handle, stdin lifecycle, and process state queries.
2
3mod drop_guard;
4pub(crate) mod output_collection;
5mod replay;
6mod spawn;
7mod termination;
8mod wait;
9
10use crate::output_stream::OutputStream;
11use crate::panic_on_drop::PanicOnDrop;
12use std::borrow::Cow;
13use std::io;
14use std::mem::ManuallyDrop;
15use std::process::ExitStatus;
16use std::time::Duration;
17use tokio::process::Child;
18use tokio::process::ChildStdin;
19
20/// Options for waiting until a process exits, terminating it if waiting fails.
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub struct WaitForCompletionOrTerminateOptions {
23    /// Maximum time to wait before attempting termination.
24    pub wait_timeout: Duration,
25
26    /// Maximum time to wait after sending the interrupt signal.
27    pub interrupt_timeout: Duration,
28
29    /// Maximum time to wait after sending the terminate signal.
30    pub terminate_timeout: Duration,
31}
32
33/// Represents the `stdin` stream of a child process.
34///
35/// stdin is always configured as piped, so it starts as `Open` with a [`ChildStdin`] handle
36/// that can be used to write data to the process. It can be explicitly closed by calling
37/// [`Stdin::close`], or implicitly closed by terminal wait helpers such as
38/// [`ProcessHandle::wait_for_completion`] and [`ProcessHandle::kill`], after which it transitions
39/// to the `Closed` state. Note that some operations (kill) close `stdin` early in order to match
40/// the behavior of tokio.
41#[derive(Debug)]
42pub enum Stdin {
43    /// `stdin` is open and available for writing.
44    Open(ChildStdin),
45
46    /// `stdin` has been closed.
47    Closed,
48}
49
50impl Stdin {
51    /// Returns `true` if stdin is open and available for writing.
52    #[must_use]
53    pub fn is_open(&self) -> bool {
54        matches!(self, Stdin::Open(_))
55    }
56
57    /// Returns a mutable reference to the underlying [`ChildStdin`] if open, or `None` if closed.
58    pub fn as_mut(&mut self) -> Option<&mut ChildStdin> {
59        match self {
60            Stdin::Open(stdin) => Some(stdin),
61            Stdin::Closed => None,
62        }
63    }
64
65    /// Closes stdin by dropping the underlying [`ChildStdin`] handle.
66    ///
67    /// This sends `EOF` to the child process. After calling this method, this stdin
68    /// will be in the `Closed` state and no further writes will be possible. Terminal wait helpers
69    /// such as [`ProcessHandle::wait_for_completion`] and [`ProcessHandle::kill`] also close any
70    /// still-open stdin automatically; call this method when the child must observe EOF earlier.
71    pub fn close(&mut self) {
72        *self = Stdin::Closed;
73    }
74}
75
76/// Represents the running state of a process.
77#[derive(Debug)]
78pub enum RunningState {
79    /// The process is still running.
80    Running,
81
82    /// The process has terminated with the given exit status.
83    Terminated(ExitStatus),
84
85    /// Reaping the child failed, so the actual state could not be observed.
86    ///
87    /// The conservative reading is "still running": until termination has been confirmed, the
88    /// drop-cleanup and panic guards on the handle are still required, and the process may yet
89    /// produce output. The variant carries the underlying [`io::Error`] so callers can log or
90    /// retry at their discretion; treating it as "not running" risks dropping a still-live child.
91    /// [`RunningState::is_definitely_running`] returns `false` here on purpose so callers cannot
92    /// mistake "we don't know" for "still running" via a single boolean check, but matching on
93    /// the enum is the only way to distinguish [`RunningState::Terminated`] from this case.
94    Uncertain(io::Error),
95}
96
97impl RunningState {
98    /// Returns `true` only when the state is [`RunningState::Running`].
99    ///
100    /// Both [`RunningState::Terminated`] and [`RunningState::Uncertain`] return `false`. The
101    /// asymmetry is intentional: callers who need to distinguish "not running" from "we don't
102    /// know" should match the enum directly. The error carried by
103    /// [`RunningState::Uncertain`] is real and worth surfacing, not silently collapsing.
104    ///
105    /// Even when this returns `false` for an [`RunningState::Uncertain`] state, the safe
106    /// interpretation is still "the process may be running"; do not use this predicate to decide
107    /// whether it is safe to drop the handle. See [`RunningState::Uncertain`].
108    #[must_use]
109    pub fn is_definitely_running(&self) -> bool {
110        matches!(self, RunningState::Running)
111    }
112}
113
114/// Drop-time behavior selected by the lifecycle methods on [`ProcessHandle`].
115///
116/// The state machine has two reachable states because every public lifecycle entry point either
117/// keeps both safeguards on (`Armed`) or turns both off (`Disarmed`). There is no "panic only,
118/// no cleanup" or "cleanup only, no panic" combination: the panic guard makes sense only when
119/// paired with the kill that signals the misuse.
120#[derive(Debug)]
121pub(super) enum DropMode {
122    /// Cleanup is attempted on drop and the panic guard fires when it does.
123    Armed { panic: PanicOnDrop },
124    /// Both cleanup and the panic guard are off. Drop is a no-op for this handle's lifecycle.
125    Disarmed,
126}
127
128/// A handle to a spawned process with captured stdout/stderr streams.
129///
130/// This type provides methods for waiting on process completion, terminating the process,
131/// and accessing its output streams. By default, processes must be explicitly waited on
132/// or terminated before being dropped (see [`ProcessHandle::must_be_terminated`]).
133///
134/// If applicable, a process handle can be wrapped in a [`crate::TerminateOnDrop`] to be terminated
135/// automatically upon being dropped. Note that this requires a multi-threaded runtime!
136#[derive(Debug)]
137pub struct ProcessHandle<Stdout, Stderr = Stdout>
138where
139    Stdout: OutputStream,
140    Stderr: OutputStream,
141{
142    pub(crate) name: Cow<'static, str>,
143    child: Child,
144    std_in: Stdin,
145    std_out_stream: Stdout,
146    std_err_stream: Stderr,
147    pub(super) drop_mode: DropMode,
148}
149
150impl<Stdout, Stderr> ProcessHandle<Stdout, Stderr>
151where
152    Stdout: OutputStream,
153    Stderr: OutputStream,
154{
155    /// Returns a mutable reference to the (potentially already closed) stdin stream.
156    ///
157    /// Use this to write data to the child process's stdin. The stdin stream implements
158    /// [`tokio::io::AsyncWrite`], allowing you to use methods like `write_all()` and `flush()`.
159    ///
160    /// # Example
161    ///
162    /// ```no_run
163    /// # use tokio::process::Command;
164    /// # use tokio_process_tools::{
165    ///     AutoName, DEFAULT_MAX_BUFFERED_CHUNKS, DEFAULT_READ_CHUNK_SIZE, Process,
166    /// };
167    /// # use tokio::io::AsyncWriteExt;
168    /// # tokio_test::block_on(async {
169    /// // The stream backend does not make a difference here.
170    /// let mut process = Process::new(Command::new("cat"))
171    ///     .name(AutoName::program_only())
172    ///     .stdout_and_stderr(|stream| {
173    ///         stream
174    ///             .broadcast()
175    ///             .best_effort_delivery()
176    ///             .no_replay()
177    ///             .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
178    ///             .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
179    ///     })
180    ///     .spawn()
181    ///     .unwrap();
182    ///
183    /// // Write to stdin.
184    /// if let Some(stdin) = process.stdin().as_mut() {
185    ///     stdin.write_all(b"Hello, process!\n").await.unwrap();
186    ///     stdin.flush().await.unwrap();
187    /// }
188    ///
189    /// // Close stdin to signal EOF.
190    /// process.stdin().close();
191    /// # });
192    /// ```
193    pub fn stdin(&mut self) -> &mut Stdin {
194        &mut self.std_in
195    }
196
197    /// Returns a reference to the stdout stream.
198    ///
199    /// For `BroadcastOutputStream`, this allows creating multiple concurrent consumers.
200    /// For `SingleSubscriberOutputStream`, only one active consumer can exist at a time
201    /// (concurrent attempts will panic with a helpful error message).
202    pub fn stdout(&self) -> &Stdout {
203        &self.std_out_stream
204    }
205
206    /// Returns a reference to the stderr stream.
207    ///
208    /// For `BroadcastOutputStream`, this allows creating multiple concurrent consumers.
209    /// For `SingleSubscriberOutputStream`, only one active consumer can exist at a time
210    /// (concurrent attempts will panic with a helpful error message).
211    pub fn stderr(&self) -> &Stderr {
212        &self.std_err_stream
213    }
214}
215
216impl<Stdout, Stderr> ProcessHandle<Stdout, Stderr>
217where
218    Stdout: OutputStream,
219    Stderr: OutputStream,
220{
221    /// Returns the OS process ID if the process hasn't exited yet.
222    ///
223    /// Once this process has been polled to completion this will return None.
224    pub fn id(&self) -> Option<u32> {
225        self.child.id()
226    }
227
228    /// Reaps the child's exit status without affecting the drop-cleanup or panic guards.
229    ///
230    /// Callers that observe an exit status are responsible for calling
231    /// [`Self::must_not_be_terminated`] when they take ownership of the lifecycle: the wait,
232    /// terminate, and kill paths do this explicitly when they succeed. `is_running` deliberately
233    /// does not, so a status check never silently disarms the safeguards.
234    pub(super) fn try_reap_exit_status(&mut self) -> Result<Option<ExitStatus>, io::Error> {
235        self.child.try_wait()
236    }
237
238    /// Checks if the process is currently running.
239    ///
240    /// Returns [`RunningState::Running`] if the process is still running,
241    /// [`RunningState::Terminated`] if it has exited, or [`RunningState::Uncertain`]
242    /// if the state could not be determined. Each call re-runs the underlying
243    /// `try_wait`, so a transient probing failure observed once does not become permanent.
244    ///
245    /// This is a pure status query: it does not disarm the drop-cleanup or panic guards, even
246    /// when it observes that the process has exited. A handle whose status reads
247    /// [`RunningState::Terminated`] still panics on drop until one of the lifecycle methods has
248    /// closed it. Use [`Self::wait_for_completion`], [`Self::terminate`], or [`Self::kill`] to
249    /// close the lifecycle through a successful terminal call, or call
250    /// [`Self::must_not_be_terminated`] explicitly to detach the handle without termination.
251    //noinspection RsSelfConvention
252    pub fn is_running(&mut self) -> RunningState {
253        match self.try_reap_exit_status() {
254            Ok(None) => RunningState::Running,
255            Ok(Some(exit_status)) => RunningState::Terminated(exit_status),
256            Err(err) => RunningState::Uncertain(err),
257        }
258    }
259}
260
261impl<Stdout, Stderr> ProcessHandle<Stdout, Stderr>
262where
263    Stdout: OutputStream,
264    Stderr: OutputStream,
265{
266    /// Consumes this handle to provide the wrapped `tokio::process::Child` instance, stdin handle,
267    /// and stdout/stderr output streams.
268    ///
269    /// The returned [`Child`] no longer owns its `stdin` field because this crate separates piped
270    /// stdin into [`Stdin`]. Keep the returned [`Stdin`] alive to keep the child's stdin pipe open.
271    /// Dropping [`Stdin::Open`] closes the pipe, so the child may observe EOF and exit or otherwise
272    /// change behavior.
273    pub fn into_inner(mut self) -> (Child, Stdin, Stdout, Stderr) {
274        self.must_not_be_terminated();
275        let mut this = ManuallyDrop::new(self);
276
277        // SAFETY: `this` is wrapped in `ManuallyDrop`, so moving out fields with `ptr::read` will
278        // not cause the original `ProcessHandle` destructor to run. We explicitly drop the fields
279        // not returned from this function exactly once before returning the owned parts.
280        unsafe {
281            let child = std::ptr::read(&raw const this.child);
282            let stdin = std::ptr::read(&raw const this.std_in);
283            let stdout = std::ptr::read(&raw const this.std_out_stream);
284            let stderr = std::ptr::read(&raw const this.std_err_stream);
285
286            std::ptr::drop_in_place(&raw mut this.name);
287            std::ptr::drop_in_place(&raw mut this.drop_mode);
288
289            (child, stdin, stdout, stderr)
290        }
291    }
292}
293
294#[cfg(test)]
295mod tests {
296    use super::*;
297    use crate::{
298        AutoName, DEFAULT_MAX_BUFFERED_CHUNKS, DEFAULT_READ_CHUNK_SIZE, Process, RunningState,
299        test_support::long_running_command,
300    };
301    use assertr::prelude::*;
302    use std::process::Stdio;
303    use std::time::Duration;
304
305    mod stdin {
306        use super::*;
307
308        #[tokio::test]
309        async fn open_stdin_reports_open_until_closed() {
310            let mut child = long_running_command(Duration::from_secs(5))
311                .stdin(Stdio::piped())
312                .spawn()
313                .unwrap();
314            let child_stdin = child.stdin.take().unwrap();
315            let mut stdin = Stdin::Open(child_stdin);
316
317            assert_that!(stdin.is_open()).is_true();
318            assert_that!(stdin.as_mut().is_some()).is_true();
319
320            stdin.close();
321
322            assert_that!(stdin.is_open()).is_false();
323            assert_that!(stdin.as_mut()).is_none();
324
325            child.kill().await.unwrap();
326        }
327
328        #[test]
329        fn closed_stdin_reports_closed() {
330            let mut stdin = Stdin::Closed;
331
332            assert_that!(stdin.is_open()).is_false();
333            assert_that!(stdin.as_mut()).is_none();
334
335            stdin.close();
336
337            assert_that!(stdin.is_open()).is_false();
338            assert_that!(stdin.as_mut()).is_none();
339        }
340    }
341
342    mod is_running {
343        use super::*;
344
345        #[tokio::test]
346        async fn does_not_disarm_drop_guards_when_process_has_exited() {
347            let mut process = Process::new(long_running_command(Duration::from_millis(50)))
348                .name(AutoName::program_only())
349                .stdout_and_stderr(|stream| {
350                    stream
351                        .broadcast()
352                        .best_effort_delivery()
353                        .no_replay()
354                        .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
355                        .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
356                })
357                .spawn()
358                .unwrap();
359
360            tokio::time::sleep(Duration::from_millis(200)).await;
361
362            // Observe the exit through the query API. Even with the child reaped, the drop
363            // guards must remain armed because is_running is documented as a pure status query.
364            let _state = process.is_running();
365
366            assert_that!(process.is_drop_armed()).is_true();
367
368            // Detach explicitly so the test does not panic when `process` drops.
369            process.must_not_be_terminated();
370        }
371
372        #[tokio::test]
373        async fn reports_running_before_wait_and_terminated_after_wait() {
374            let mut process = Process::new(long_running_command(Duration::from_secs(1)))
375                .name(AutoName::program_only())
376                .stdout_and_stderr(|stream| {
377                    stream
378                        .broadcast()
379                        .best_effort_delivery()
380                        .no_replay()
381                        .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
382                        .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
383                })
384                .spawn()
385                .unwrap();
386
387            match process.is_running() {
388                RunningState::Running => {}
389                RunningState::Terminated(exit_status) => {
390                    assert_that!(exit_status).fail("process should still be running");
391                }
392                RunningState::Uncertain(_) => {
393                    assert_that!(&process).fail("process state should not be uncertain");
394                }
395            }
396
397            process
398                .wait_for_completion(Duration::from_secs(2))
399                .await
400                .unwrap();
401
402            match process.is_running() {
403                RunningState::Running => {
404                    assert_that!(process).fail("process should not be running anymore");
405                }
406                RunningState::Terminated(exit_status) => {
407                    assert_that!(exit_status.code()).is_some().is_equal_to(0);
408                    assert_that!(exit_status.success()).is_true();
409                }
410                RunningState::Uncertain(_) => {
411                    assert_that!(process).fail("process state should not be uncertain");
412                }
413            }
414        }
415    }
416
417    #[cfg(test)]
418    mod into_inner {
419        use super::*;
420        use crate::LineParsingOptions;
421        use crate::test_support::line_collection_options;
422        use tokio::io::AsyncWriteExt;
423
424        #[tokio::test]
425        async fn returns_stdin_with_pipe_still_open() {
426            let cmd = tokio::process::Command::new("cat");
427            let process = Process::new(cmd)
428                .name("cat")
429                .stdout_and_stderr(|stream| {
430                    stream
431                        .broadcast()
432                        .best_effort_delivery()
433                        .no_replay()
434                        .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
435                        .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
436                })
437                .spawn()
438                .unwrap();
439
440            let (mut child, mut stdin, stdout, _stderr) = process.into_inner();
441            assert_that!(child.stdin.is_none()).is_true();
442
443            tokio::time::sleep(Duration::from_millis(100)).await;
444            assert_that!(child.try_wait().unwrap().is_none()).is_true();
445
446            let collector = stdout
447                .collect_lines_into_vec(LineParsingOptions::default(), line_collection_options());
448
449            let Some(stdin_handle) = stdin.as_mut() else {
450                assert_that!(stdin.is_open()).fail("stdin should be returned open");
451                return;
452            };
453            stdin_handle
454                .write_all(b"stdin stayed open\n")
455                .await
456                .unwrap();
457            stdin_handle.flush().await.unwrap();
458
459            stdin.close();
460
461            let status = tokio::time::timeout(Duration::from_secs(2), child.wait())
462                .await
463                .unwrap()
464                .unwrap();
465            assert_that!(status.success()).is_true();
466
467            let collected = collector.wait().await.unwrap();
468            assert_that!(collected.lines().len()).is_equal_to(1);
469            assert_that!(collected[0].as_str()).is_equal_to("stdin stayed open");
470        }
471
472        #[tokio::test]
473        async fn defuses_panic_guard() {
474            let process = Process::new(long_running_command(Duration::from_secs(5)))
475                .name("long-running")
476                .stdout_and_stderr(|stream| {
477                    stream
478                        .broadcast()
479                        .best_effort_delivery()
480                        .no_replay()
481                        .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
482                        .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
483                })
484                .spawn()
485                .unwrap();
486
487            let (mut child, _stdin, _stdout, _stderr) = process.into_inner();
488            child.kill().await.unwrap();
489            let _status = child.wait().await.unwrap();
490        }
491
492        #[tokio::test]
493        async fn supports_handles_built_with_owned_name() {
494            let process = Process::new(long_running_command(Duration::from_secs(5)))
495                .name(format!("sleeper-{}", 7))
496                .stdout_and_stderr(|stream| {
497                    stream
498                        .broadcast()
499                        .best_effort_delivery()
500                        .no_replay()
501                        .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
502                        .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
503                })
504                .spawn()
505                .unwrap();
506
507            let (mut child, _stdin, _stdout, _stderr) = process.into_inner();
508            child.kill().await.unwrap();
509            let _status = child.wait().await.unwrap();
510        }
511    }
512}