tokio_process_tools/process_handle/mod.rs
1//! Spawned-process handle, stdin lifecycle, and process state queries.
2
3mod drop_guard;
4pub(crate) mod output_collection;
5mod replay;
6mod spawn;
7mod termination;
8mod wait;
9
10use crate::output_stream::OutputStream;
11use crate::panic_on_drop::PanicOnDrop;
12use std::borrow::Cow;
13use std::io;
14use std::mem::ManuallyDrop;
15use std::process::ExitStatus;
16use std::time::Duration;
17use tokio::process::Child;
18use tokio::process::ChildStdin;
19
20/// Options for waiting until a process exits, terminating it if waiting fails.
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub struct WaitForCompletionOrTerminateOptions {
23 /// Maximum time to wait before attempting termination.
24 pub wait_timeout: Duration,
25
26 /// Maximum time to wait after sending the interrupt signal.
27 pub interrupt_timeout: Duration,
28
29 /// Maximum time to wait after sending the terminate signal.
30 pub terminate_timeout: Duration,
31}
32
33/// Represents the `stdin` stream of a child process.
34///
35/// stdin is always configured as piped, so it starts as `Open` with a [`ChildStdin`] handle
36/// that can be used to write data to the process. It can be explicitly closed by calling
37/// [`Stdin::close`], or implicitly closed by terminal wait helpers such as
38/// [`ProcessHandle::wait_for_completion`] and [`ProcessHandle::kill`], after which it transitions
39/// to the `Closed` state. Note that some operations (kill) close `stdin` early in order to match
40/// the behavior of tokio.
41#[derive(Debug)]
42pub enum Stdin {
43 /// `stdin` is open and available for writing.
44 Open(ChildStdin),
45
46 /// `stdin` has been closed.
47 Closed,
48}
49
50impl Stdin {
51 /// Returns `true` if stdin is open and available for writing.
52 #[must_use]
53 pub fn is_open(&self) -> bool {
54 matches!(self, Stdin::Open(_))
55 }
56
57 /// Returns a mutable reference to the underlying [`ChildStdin`] if open, or `None` if closed.
58 pub fn as_mut(&mut self) -> Option<&mut ChildStdin> {
59 match self {
60 Stdin::Open(stdin) => Some(stdin),
61 Stdin::Closed => None,
62 }
63 }
64
65 /// Closes stdin by dropping the underlying [`ChildStdin`] handle.
66 ///
67 /// This sends `EOF` to the child process. After calling this method, this stdin
68 /// will be in the `Closed` state and no further writes will be possible. Terminal wait helpers
69 /// such as [`ProcessHandle::wait_for_completion`] and [`ProcessHandle::kill`] also close any
70 /// still-open stdin automatically; call this method when the child must observe EOF earlier.
71 pub fn close(&mut self) {
72 *self = Stdin::Closed;
73 }
74}
75
76/// Represents the running state of a process.
77#[derive(Debug)]
78pub enum RunningState {
79 /// The process is still running.
80 Running,
81
82 /// The process has terminated with the given exit status.
83 Terminated(ExitStatus),
84
85 /// Reaping the child failed, so the actual state could not be observed.
86 ///
87 /// The conservative reading is "still running": until termination has been confirmed, the
88 /// drop-cleanup and panic guards on the handle are still required, and the process may yet
89 /// produce output. The variant carries the underlying [`io::Error`] so callers can log or
90 /// retry at their discretion; treating it as "not running" risks dropping a still-live child.
91 /// [`RunningState::is_definitely_running`] returns `false` here on purpose so callers cannot
92 /// mistake "we don't know" for "still running" via a single boolean check, but matching on
93 /// the enum is the only way to distinguish [`RunningState::Terminated`] from this case.
94 Uncertain(io::Error),
95}
96
97impl RunningState {
98 /// Returns `true` only when the state is [`RunningState::Running`].
99 ///
100 /// Both [`RunningState::Terminated`] and [`RunningState::Uncertain`] return `false`. The
101 /// asymmetry is intentional: callers who need to distinguish "not running" from "we don't
102 /// know" should match the enum directly. The error carried by
103 /// [`RunningState::Uncertain`] is real and worth surfacing, not silently collapsing.
104 ///
105 /// Even when this returns `false` for an [`RunningState::Uncertain`] state, the safe
106 /// interpretation is still "the process may be running"; do not use this predicate to decide
107 /// whether it is safe to drop the handle. See [`RunningState::Uncertain`].
108 #[must_use]
109 pub fn is_definitely_running(&self) -> bool {
110 matches!(self, RunningState::Running)
111 }
112}
113
114/// Drop-time behavior selected by the lifecycle methods on [`ProcessHandle`].
115///
116/// The state machine has two reachable states because every public lifecycle entry point either
117/// keeps both safeguards on (`Armed`) or turns both off (`Disarmed`). There is no "panic only,
118/// no cleanup" or "cleanup only, no panic" combination: the panic guard makes sense only when
119/// paired with the kill that signals the misuse.
120#[derive(Debug)]
121pub(super) enum DropMode {
122 /// Cleanup is attempted on drop and the panic guard fires when it does.
123 Armed { panic: PanicOnDrop },
124 /// Both cleanup and the panic guard are off. Drop is a no-op for this handle's lifecycle.
125 Disarmed,
126}
127
128/// A handle to a spawned process with captured stdout/stderr streams.
129///
130/// This type provides methods for waiting on process completion, terminating the process,
131/// and accessing its output streams. By default, processes must be explicitly waited on
132/// or terminated before being dropped (see [`ProcessHandle::must_be_terminated`]).
133///
134/// If applicable, a process handle can be wrapped in a [`crate::TerminateOnDrop`] to be terminated
135/// automatically upon being dropped. Note that this requires a multi-threaded runtime!
136#[derive(Debug)]
137pub struct ProcessHandle<Stdout, Stderr = Stdout>
138where
139 Stdout: OutputStream,
140 Stderr: OutputStream,
141{
142 pub(crate) name: Cow<'static, str>,
143 child: Child,
144 std_in: Stdin,
145 std_out_stream: Stdout,
146 std_err_stream: Stderr,
147 pub(super) drop_mode: DropMode,
148}
149
150impl<Stdout, Stderr> ProcessHandle<Stdout, Stderr>
151where
152 Stdout: OutputStream,
153 Stderr: OutputStream,
154{
155 /// Returns a mutable reference to the (potentially already closed) stdin stream.
156 ///
157 /// Use this to write data to the child process's stdin. The stdin stream implements
158 /// [`tokio::io::AsyncWrite`], allowing you to use methods like `write_all()` and `flush()`.
159 ///
160 /// # Example
161 ///
162 /// ```no_run
163 /// # use tokio::process::Command;
164 /// # use tokio_process_tools::{
165 /// AutoName, DEFAULT_MAX_BUFFERED_CHUNKS, DEFAULT_READ_CHUNK_SIZE, Process,
166 /// };
167 /// # use tokio::io::AsyncWriteExt;
168 /// # tokio_test::block_on(async {
169 /// // The stream backend does not make a difference here.
170 /// let mut process = Process::new(Command::new("cat"))
171 /// .name(AutoName::program_only())
172 /// .stdout_and_stderr(|stream| {
173 /// stream
174 /// .broadcast()
175 /// .best_effort_delivery()
176 /// .no_replay()
177 /// .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
178 /// .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
179 /// })
180 /// .spawn()
181 /// .unwrap();
182 ///
183 /// // Write to stdin.
184 /// if let Some(stdin) = process.stdin().as_mut() {
185 /// stdin.write_all(b"Hello, process!\n").await.unwrap();
186 /// stdin.flush().await.unwrap();
187 /// }
188 ///
189 /// // Close stdin to signal EOF.
190 /// process.stdin().close();
191 /// # });
192 /// ```
193 pub fn stdin(&mut self) -> &mut Stdin {
194 &mut self.std_in
195 }
196
197 /// Returns a reference to the stdout stream.
198 ///
199 /// For `BroadcastOutputStream`, this allows creating multiple concurrent consumers.
200 /// For `SingleSubscriberOutputStream`, only one active consumer can exist at a time
201 /// (concurrent attempts will panic with a helpful error message).
202 pub fn stdout(&self) -> &Stdout {
203 &self.std_out_stream
204 }
205
206 /// Returns a reference to the stderr stream.
207 ///
208 /// For `BroadcastOutputStream`, this allows creating multiple concurrent consumers.
209 /// For `SingleSubscriberOutputStream`, only one active consumer can exist at a time
210 /// (concurrent attempts will panic with a helpful error message).
211 pub fn stderr(&self) -> &Stderr {
212 &self.std_err_stream
213 }
214}
215
216impl<Stdout, Stderr> ProcessHandle<Stdout, Stderr>
217where
218 Stdout: OutputStream,
219 Stderr: OutputStream,
220{
221 /// Returns the OS process ID if the process hasn't exited yet.
222 ///
223 /// Once this process has been polled to completion this will return None.
224 pub fn id(&self) -> Option<u32> {
225 self.child.id()
226 }
227
228 /// Reaps the child's exit status without affecting the drop-cleanup or panic guards.
229 ///
230 /// Callers that observe an exit status are responsible for calling
231 /// [`Self::must_not_be_terminated`] when they take ownership of the lifecycle: the wait,
232 /// terminate, and kill paths do this explicitly when they succeed. `is_running` deliberately
233 /// does not, so a status check never silently disarms the safeguards.
234 pub(super) fn try_reap_exit_status(&mut self) -> Result<Option<ExitStatus>, io::Error> {
235 self.child.try_wait()
236 }
237
238 /// Checks if the process is currently running.
239 ///
240 /// Returns [`RunningState::Running`] if the process is still running,
241 /// [`RunningState::Terminated`] if it has exited, or [`RunningState::Uncertain`]
242 /// if the state could not be determined. Each call re-runs the underlying
243 /// `try_wait`, so a transient probing failure observed once does not become permanent.
244 ///
245 /// This is a pure status query: it does not disarm the drop-cleanup or panic guards, even
246 /// when it observes that the process has exited. A handle whose status reads
247 /// [`RunningState::Terminated`] still panics on drop until one of the lifecycle methods has
248 /// closed it. Use [`Self::wait_for_completion`], [`Self::terminate`], or [`Self::kill`] to
249 /// close the lifecycle through a successful terminal call, or call
250 /// [`Self::must_not_be_terminated`] explicitly to detach the handle without termination.
251 //noinspection RsSelfConvention
252 pub fn is_running(&mut self) -> RunningState {
253 match self.try_reap_exit_status() {
254 Ok(None) => RunningState::Running,
255 Ok(Some(exit_status)) => RunningState::Terminated(exit_status),
256 Err(err) => RunningState::Uncertain(err),
257 }
258 }
259}
260
261impl<Stdout, Stderr> ProcessHandle<Stdout, Stderr>
262where
263 Stdout: OutputStream,
264 Stderr: OutputStream,
265{
266 /// Consumes this handle to provide the wrapped `tokio::process::Child` instance, stdin handle,
267 /// and stdout/stderr output streams.
268 ///
269 /// The returned [`Child`] no longer owns its `stdin` field because this crate separates piped
270 /// stdin into [`Stdin`]. Keep the returned [`Stdin`] alive to keep the child's stdin pipe open.
271 /// Dropping [`Stdin::Open`] closes the pipe, so the child may observe EOF and exit or otherwise
272 /// change behavior.
273 pub fn into_inner(mut self) -> (Child, Stdin, Stdout, Stderr) {
274 self.must_not_be_terminated();
275 let mut this = ManuallyDrop::new(self);
276
277 // SAFETY: `this` is wrapped in `ManuallyDrop`, so moving out fields with `ptr::read` will
278 // not cause the original `ProcessHandle` destructor to run. We explicitly drop the fields
279 // not returned from this function exactly once before returning the owned parts.
280 unsafe {
281 let child = std::ptr::read(&raw const this.child);
282 let stdin = std::ptr::read(&raw const this.std_in);
283 let stdout = std::ptr::read(&raw const this.std_out_stream);
284 let stderr = std::ptr::read(&raw const this.std_err_stream);
285
286 std::ptr::drop_in_place(&raw mut this.name);
287 std::ptr::drop_in_place(&raw mut this.drop_mode);
288
289 (child, stdin, stdout, stderr)
290 }
291 }
292}
293
294#[cfg(test)]
295mod tests {
296 use super::*;
297 use crate::{
298 AutoName, DEFAULT_MAX_BUFFERED_CHUNKS, DEFAULT_READ_CHUNK_SIZE, Process, RunningState,
299 test_support::long_running_command,
300 };
301 use assertr::prelude::*;
302 use std::process::Stdio;
303 use std::time::Duration;
304
305 mod stdin {
306 use super::*;
307
308 #[tokio::test]
309 async fn open_stdin_reports_open_until_closed() {
310 let mut child = long_running_command(Duration::from_secs(5))
311 .stdin(Stdio::piped())
312 .spawn()
313 .unwrap();
314 let child_stdin = child.stdin.take().unwrap();
315 let mut stdin = Stdin::Open(child_stdin);
316
317 assert_that!(stdin.is_open()).is_true();
318 assert_that!(stdin.as_mut().is_some()).is_true();
319
320 stdin.close();
321
322 assert_that!(stdin.is_open()).is_false();
323 assert_that!(stdin.as_mut()).is_none();
324
325 child.kill().await.unwrap();
326 }
327
328 #[test]
329 fn closed_stdin_reports_closed() {
330 let mut stdin = Stdin::Closed;
331
332 assert_that!(stdin.is_open()).is_false();
333 assert_that!(stdin.as_mut()).is_none();
334
335 stdin.close();
336
337 assert_that!(stdin.is_open()).is_false();
338 assert_that!(stdin.as_mut()).is_none();
339 }
340 }
341
342 mod is_running {
343 use super::*;
344
345 #[tokio::test]
346 async fn does_not_disarm_drop_guards_when_process_has_exited() {
347 let mut process = Process::new(long_running_command(Duration::from_millis(50)))
348 .name(AutoName::program_only())
349 .stdout_and_stderr(|stream| {
350 stream
351 .broadcast()
352 .best_effort_delivery()
353 .no_replay()
354 .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
355 .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
356 })
357 .spawn()
358 .unwrap();
359
360 tokio::time::sleep(Duration::from_millis(200)).await;
361
362 // Observe the exit through the query API. Even with the child reaped, the drop
363 // guards must remain armed because is_running is documented as a pure status query.
364 let _state = process.is_running();
365
366 assert_that!(process.is_drop_armed()).is_true();
367
368 // Detach explicitly so the test does not panic when `process` drops.
369 process.must_not_be_terminated();
370 }
371
372 #[tokio::test]
373 async fn reports_running_before_wait_and_terminated_after_wait() {
374 let mut process = Process::new(long_running_command(Duration::from_secs(1)))
375 .name(AutoName::program_only())
376 .stdout_and_stderr(|stream| {
377 stream
378 .broadcast()
379 .best_effort_delivery()
380 .no_replay()
381 .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
382 .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
383 })
384 .spawn()
385 .unwrap();
386
387 match process.is_running() {
388 RunningState::Running => {}
389 RunningState::Terminated(exit_status) => {
390 assert_that!(exit_status).fail("process should still be running");
391 }
392 RunningState::Uncertain(_) => {
393 assert_that!(&process).fail("process state should not be uncertain");
394 }
395 }
396
397 process
398 .wait_for_completion(Duration::from_secs(2))
399 .await
400 .unwrap();
401
402 match process.is_running() {
403 RunningState::Running => {
404 assert_that!(process).fail("process should not be running anymore");
405 }
406 RunningState::Terminated(exit_status) => {
407 assert_that!(exit_status.code()).is_some().is_equal_to(0);
408 assert_that!(exit_status.success()).is_true();
409 }
410 RunningState::Uncertain(_) => {
411 assert_that!(process).fail("process state should not be uncertain");
412 }
413 }
414 }
415 }
416
417 #[cfg(test)]
418 mod into_inner {
419 use super::*;
420 use crate::LineParsingOptions;
421 use crate::test_support::line_collection_options;
422 use tokio::io::AsyncWriteExt;
423
424 #[tokio::test]
425 async fn returns_stdin_with_pipe_still_open() {
426 let cmd = tokio::process::Command::new("cat");
427 let process = Process::new(cmd)
428 .name("cat")
429 .stdout_and_stderr(|stream| {
430 stream
431 .broadcast()
432 .best_effort_delivery()
433 .no_replay()
434 .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
435 .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
436 })
437 .spawn()
438 .unwrap();
439
440 let (mut child, mut stdin, stdout, _stderr) = process.into_inner();
441 assert_that!(child.stdin.is_none()).is_true();
442
443 tokio::time::sleep(Duration::from_millis(100)).await;
444 assert_that!(child.try_wait().unwrap().is_none()).is_true();
445
446 let collector = stdout
447 .collect_lines_into_vec(LineParsingOptions::default(), line_collection_options());
448
449 let Some(stdin_handle) = stdin.as_mut() else {
450 assert_that!(stdin.is_open()).fail("stdin should be returned open");
451 return;
452 };
453 stdin_handle
454 .write_all(b"stdin stayed open\n")
455 .await
456 .unwrap();
457 stdin_handle.flush().await.unwrap();
458
459 stdin.close();
460
461 let status = tokio::time::timeout(Duration::from_secs(2), child.wait())
462 .await
463 .unwrap()
464 .unwrap();
465 assert_that!(status.success()).is_true();
466
467 let collected = collector.wait().await.unwrap();
468 assert_that!(collected.lines().len()).is_equal_to(1);
469 assert_that!(collected[0].as_str()).is_equal_to("stdin stayed open");
470 }
471
472 #[tokio::test]
473 async fn defuses_panic_guard() {
474 let process = Process::new(long_running_command(Duration::from_secs(5)))
475 .name("long-running")
476 .stdout_and_stderr(|stream| {
477 stream
478 .broadcast()
479 .best_effort_delivery()
480 .no_replay()
481 .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
482 .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
483 })
484 .spawn()
485 .unwrap();
486
487 let (mut child, _stdin, _stdout, _stderr) = process.into_inner();
488 child.kill().await.unwrap();
489 let _status = child.wait().await.unwrap();
490 }
491
492 #[tokio::test]
493 async fn supports_handles_built_with_owned_name() {
494 let process = Process::new(long_running_command(Duration::from_secs(5)))
495 .name(format!("sleeper-{}", 7))
496 .stdout_and_stderr(|stream| {
497 stream
498 .broadcast()
499 .best_effort_delivery()
500 .no_replay()
501 .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
502 .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
503 })
504 .spawn()
505 .unwrap();
506
507 let (mut child, _stdin, _stdout, _stderr) = process.into_inner();
508 child.kill().await.unwrap();
509 let _status = child.wait().await.unwrap();
510 }
511 }
512}