tokio-process-tools
A correctness-focused async subprocess library for Tokio. It wraps tokio::process::Command and exposes explicit
controls over resource limits, output consumption, process termination, and cleanup.
Most subprocess code in the wild leaves a handful of decisions implicit:
- What happens when a log consumer falls behind? Is data dropped or is the child blocked?
- Can a stream consumer reliably receive all child startup output even when attached late?
- How much memory will the system need for stream processing? How much for line processing?
- When and how is the child terminated?
- What happens to the child if the parent panics?
Each of those questions may be answered with a hidden default. Some may be fine in isolation. But in aggregate, system behavior becomes dangerously uncertain. The result can be: silent gaps, leaked children, unbounded buffering.
tokio-process-tools makes every one of those choices a required argument at the process construction call site. There
is no default delivery or replay policy, no default buffer sizes. The cost is a more verbose spawn call, the benefit is
that the choice is visible in code review and to whoever debugs the system at 3am.
When to use this
Reach for it when you are developing any of:
- a build or development tool that streams live output and keeps a bounded tail for failure reports,
- an integration test that starts a real service and waits until it is ready,
- an orchestration layer that must not leak children after failures or timeouts,
Or when you need any of:
- one stream consumed by more than one subsystem (e.g. logging plus readiness checks),
- bounded memory in the face of untrusted or unknown output volume.
If tokio::process::Command already gives you enough control, this crate may add too much ceremony.
Core model
The library has three layers, and most of the configuration in this README is about the relationship between them:
- A
Processis a spawn configuration.Process::new(command)starts the chain,.spawn()returns aProcessHandle. - An output stream is the buffered pipeline that sits between a child's stdout (or stderr) and your code. Each stream is configured at spawn time (see Choosing stream settings) and emits raw chunks, which consumers parse on the fly.
- A consumer is anything that reads from a stream: an inspector that runs a callback, a collector that retains
output for later, a
wait_for_line()that resolves on a match. Consumers are implemented through a visitor pattern, allowing for custom extensions to be written.
Consumers attach after spawn, so anything the child writes before a consumer is in place is gone unless a replay policy retains it for late arrivals.
A ProcessHandle is "armed" until it is successfully waited, terminated, killed, or explicitly excused with
must_not_be_terminated(). Dropping an armed handle without one of those calls runs best-effort cleanup and then
panics. Better to be loud than to silently leak children.
See Automatic termination on drop for the opt-out.
Quickstart
Add to your Cargo.toml:
[]
= "0.9.0"
= { = "1", = ["macros", "process", "rt-multi-thread"] }
The interactive-stdin example further down uses tokio::io::AsyncWriteExt, which requires the io-util Tokio feature
in your own crate.
The minimum viable spawn: run a command to completion with a deadline and check its exit status. stream.discard()
routes the child's stdout and stderr to /dev/null at the OS level, so no pipe is allocated and no reader task runs in
the parent. Use this when you only care about whether the process succeeded, not what it wrote.
use Duration;
use Command;
use *;
async
A discarded stream still implements OutputStream (so process.stdout() and process.stderr() still return
something), but it does not implement TrySubscribable: the consumer-attaching APIs (wait_for_completion_with_output,
inspect_lines, collect_lines, etc.) are not in scope on a handle whose stream is discarded. If you later decide you
do want to read the output, swap discard() for one of the streaming chains described in the next section.
Code throughout this README uses use tokio_process_tools::*; to keep examples short; prefer named imports in
application code. Byte sizes use the NumBytes type, and the NumBytesExt trait (re-exported at the crate root) lets
you write 1.megabytes(), 16.kilobytes(), or 512.bytes() on integer (usize) literals.
Choosing stream settings
Each stdout/stderr stream is configured along five axes: backend, delivery, replay, collection, and buffering.
Backend: who consumes the stream?
broadcast(): Allows for multiple concurrent consumers, e.g. logging plus a line waiter.single_subscriber(): Exactly one consumer is allowed to be active at a time. A second consumer started while another is still running fails immediately withStreamConsumerError::ActiveConsumer. Reach for this when you only ever read the stream from one place at a time. It turns accidental fanout into a loud error at the point of mistake. Once a consumer is dropped again, a new one can be registered. Just never two at any given time. This implementation may be slightly more performant.discard(): No consumption at all. The matching child stdio slot is set toStdio::null(), so the OS drops the bytes; no pipe is allocated and no reader task runs in the parent. The remaining axes (delivery, replay, buffering) are skipped because they have nothing to act on. Reach for this when only the exit status matters; pair it with a consumed stream on the other slot if you still want one stream's output (.stdout(|s| s.discard()).stderr(|s| s.broadcast()...)is fine).
Delivery: what happens when a consumer can't keep up?
reliable_for_active_subscribers(): When an active consumer's buffer fills, the child is paused (its next write blocks) instead of dropping data. The guarantee is scoped to consumers that are already attached: a consumer that arrives after the child has produced output gets nothing unless you also configure a replay policy. Use when completeness of the stream / definitely receiving all output matters more than keeping the child unblocked.best_effort_delivery(): Slow consumers may observe gaps. Line consumers drop the in-progress partial line and resync at the next newline. Use when latency matters more than completeness. This never blocks the child.
Replay: what does a consumer attached after spawn see?
.no_replay(): Live output only. A consumer that attaches after the child has already printed something will not see it..replay_last_bytes(...)/.replay_last_chunks(...)/.replay_all(): The library retains output so a late consumer can start from history. The first two cap retention..replay_all()is unbounded and should only be used for trusted output volume.
The typical lifecycle for a long-running service is: spawn with a replay policy, attach any consumers you want, then
call seal_output_replay() to release the retained history. The seal applies to future consumers: They start at live
output. Consumers that were already attached when the seal happens are unaffected, and whatever the delivery policy
guaranteed them is still there to read. The per-stream variants seal_stdout_replay() and seal_stderr_replay() cover
the case where one stream's startup phase ends before the other's.
Collection: how much memory does the library hold?
Use bounded LineCollectionOptions / RawCollectionOptions for untrusted output. Reserve TrustedUnbounded for
processes whose output volume is known and controlled. The bounded variants take a CollectionOverflowBehavior that
decides what to keep when the cap is reached: DropAdditionalData (default) keeps the first-retained output and
discards anything that arrives after the cap is hit, while DropOldestData keeps the newest output by evicting older
retained data, which gives you a ring-buffer-shaped tail.
Buffering: how big a read, how deep a queue?
Every chain ends with two buffering knobs:
.read_chunk_size(...)is the size of eachread()against the child's pipe. Smaller values let a consumer see partial results from large bursts sooner and cap memory. Larger values reduce per-burst syscall and per-chunk overhead..max_buffered_chunks(...)is the depth of the in-process queue between the reader task and the consumer. Larger values absorb more bursty output before backpressure or dropping kicks in (depending on the delivery policy). Smaller values cap memory.
Pass DEFAULT_READ_CHUNK_SIZE and DEFAULT_MAX_BUFFERED_CHUNKS unless you have measured a reason to tune them. The
defaults (16 KB and 128 chunks) are tuned for general-purpose streaming. Both knobs must be named at the spawn site:
the spelled-out values keep the choice visible in code review and stop accidental mismatches between sites that all
silently rely on a default.
The async line-aware consumers (inspect_lines_async, collect_lines_async, collect_lines_into_write, and friends
that take an AsyncLineSink) materialize every parsed line as an owned String before invoking the per-line callback.
The synchronous line consumers can pass the line as Cow::Borrowed straight out of the chunk on the fast path, so they
avoid the allocation. Prefer the synchronous flavor when per-line work is non-blocking; reach for the async flavor only
when the per-line callback genuinely needs to .await.
Consumers
tokio-process-tools provides the following consumers out of the box:
inspect_lines/inspect_chunks(and their_asyncvariants) run a callback per item and discard the data. Reach for these when only the side effect matters: forward totracing, count occurrences, react to a milestone.collect_lines/collect_chunks(and_asyncvariants) retain output in memory for later inspection. Reach for these when a test or caller wants to assert on what the child wrote after it has finished andwait_for_completion_with_outputdoesn't fit.collect_lines_into_write/collect_chunks_into_write(plus_mappedvariants) forward output to an async writer (a file, a TCP sink, anythingAsyncWrite).WriteCollectionOptionscontrols writer-failure handling andLineWriteModecontrols whether parsed lines get the stripped\nreattached. See Stream output to a sink for an end-to-end example.
See the Custom consumer / stream visitor example for guidance on how to build your own consumers.
Consumer lifecycle
Each Consumer<S> handle owns the spawned task. Dropping the handle drops the task, which cancels the consumer
immediately. Bind it to a variable, e.g. let _consumer = process.stdout().inspect_line(...), to silence the
unused-result warning (if you don't intend to interact with the handle). The #[must_use] annotations on every
consumer-creator is the compiler's reminder of this contract. If your process is long-lived, make sure to store consumer
handles somewhere, so that they are not dropped.
A Consumer exposes three ways to finish. The choice is a trade-off between recovering state (the writer, the
collected output) and not hanging:
wait()lets the consumer drain naturally. It returns when the stream closes (the child exited and any retained replay has been read out) or the visitor returnedNext::Break. Use this when there is a natural endpoint, and you want whatever the consumer produced; this is the path that gives you the writer back from acollect_*_into_writeconsumer.cancel(timeout)is the bounded cooperative-then-forceful shutdown. The consumer task is signaled to stop at the next safe point. If it observes the cancellation and exits beforetimeoutelapses, you getConsumerCancelOutcome::Cancelled(sink)with the sink preserved. Iftimeoutelapses first, the task is aborted, and you getConsumerCancelOutcome::Abortedwith the sink dropped. Pick the timeout based on how long the consumer might legitimately need to flush.abort()is unconditional forceful shutdown. The task's in-flight future is dropped, which means sinks and writers are not returned (you cannot recover the file handle, the partially-collectedVec, etc.). Reach for this when not getting stuck matters more than recovering state, and you do not want to pay any timeout at all.
Examples
Patterns past the basic spawn-and-wait shown in the Quickstart.
Wait and collect output
Like the Quickstart spawn, but the stdout/stderr that appeared during the run is captured and returned. The collector
here is attached inside the wait helper, after the child has already started producing output, which is why the stream
is configured with replay_last_bytes(...). Without that policy, anything the child wrote before the helper attached
the collector would be lost (see Core model). The wait timeout covers both process exit and the final
drain of stdout/stderr.
use Duration;
use Command;
use *;
async
Use wait_for_completion_with_raw_output() when the child's output is not UTF-8 line-oriented: binary blobs (image
conversion, archive tools), framed protocols, or anything where line parsing would corrupt or lose bytes. The shape of
the call is the same; substitute RawOutputOptions for LineOutputOptions, RawCollectionOptions for
LineCollectionOptions, and read the result as CollectedBytes { bytes, truncated } on each stream. No
LineParsingOptions is involved. RawCollectionOptions offers the same Bounded { max_bytes, overflow_behavior }
and TrustedUnbounded variants as the line equivalent.
Wait for readiness from output
The classic "start a service in an integration test and wait until it logs that it is ready" pattern. wait_for_line()
is itself a stream consumer with a required timeout, so under single_subscriber() it competes with anything else
reading the stream. Reach for broadcast() whenever readiness detection has to coexist with a logger. Pair it with
replay retention so a waiter attached after spawn can still see startup lines, then seal_output_replay() to free that
history once the service is up.
use Duration;
use Command;
use *;
async
Interactive stdin
Children always have stdin piped (there is no opt-out), but the library only flushes/closes the pipe when you ask.
Calling stdin().close() lets a child observe EOF before any wait helper runs (some programs only finish on EOF, not
on idle). Terminal waits and kill() close any still-open stdin automatically, so the manual close is only needed
when the child must see EOF before the wait.
process.stdin() returns a &mut Stdin, an enum with Open(ChildStdin) and Closed variants. Stdin::as_mut()
projects that into Option<&mut ChildStdin>, returning None once the slot has transitioned to Closed (either
explicitly via stdin().close() or implicitly because a wait helper or kill() has already cleaned it up). The
example below pattern-matches with if let Some(stdin) = process.stdin().as_mut() for that reason. The None branch
has no other failure mode in the stdin-is-always-piped contract.
use Duration;
use AsyncWriteExt;
use Command;
use *;
async
Stream output to a sink
When the child will produce more output than is reasonable to hold in memory (build tools, long-running servers in CI
logs, or anything you want on disk rather than in Vec<String>), point a collector at an AsyncWrite sink instead.
The example below tees stdout into a file. Parsed lines are written one per call, with LineWriteMode::AppendLf
re-attaching the trailing newline that line parsing strips. WriteCollectionOptions::log_and_continue() keeps
collection running if the file write ever fails (disk full, broken pipe to a piped writer, etc.); use fail_fast()
instead when a sink failure should stop the collection.
The collector is yours to drive. After the child exits the stream closes, the visitor drains, and wait() returns
the file handle so you can flush or close it deterministically.
This example needs the fs Tokio feature in your own crate, in addition to the features listed in the
Quickstart.
use Duration;
use File;
use Command;
use *;
async
Timeout and Cleanup
For processes that may hang, wait_for_completion_or_terminate() rolls the "interrupt → terminate → kill"
escalation into the wait so you don't have to sequence cleanup yourself. The three durations correspond to: how long
to wait for a clean exit, how long to give the child after the interrupt signal, and how long to give it after the
terminate signal before falling back to kill().
use Duration;
use Command;
use *;
async
Custom consumer / stream visitor
When the built-in inspect_* and collect_* consumers don't fit, implement [StreamVisitor] (sync) or
[AsyncStreamVisitor] (async) directly and attach it with consume_with / consume_with_async. The cases that pay
for a custom visitor are: maintaining state across chunks, reacting explicitly to gap notifications, or producing an
output type the built-ins don't expose. If all you need is a side effect per chunk or per line, reach for
inspect_chunks / inspect_lines; the closure form is simpler.
Pick StreamVisitor when on_chunk can return without .await-ing (counters, in-memory state machines, channel
try_send, simple synchronous I/O). Pick AsyncStreamVisitor when chunk handling needs to await (network sends,
async writers, channel send with backpressure). Both traits require Send + 'static, and both return their final
value via into_output(self) at the end of the consumer task's life.
The example below tracks how many bytes and chunks the stream produced, plus how many gap events the backend reported.
on_gap is invoked when one or more chunks were dropped between the previous and next observed chunk (only possible
under best_effort_delivery() overflow), and visitors that splice bytes across chunks should treat it as a reset
signal.
use Duration;
use Command;
use *;
async
A few lifecycle quirks worth knowing. on_gap() is synchronous in both traits because gap notifications carry no
data to await on. on_eof() is skipped when on_chunk returned Next::Break, on the assumption that an
early-break visitor already has its result; cancellation and abort skip on_eof for the same reason. Whichever exit
path was taken, into_output always runs, so Consumer::wait() and cancel() always have a value to yield.
Further APIs
The Quickstart and Examples cover the common path. The rest of the surface is small but purposeful. Here is when you would reach for each piece.
Wait helper variants
wait_for_completion_with_output() and friends come in four combinations: line vs. raw bytes, and plain wait vs.
wait-or-terminate. Pick the variant that matches what you need:
- Line vs. raw. Line collection parses on the fly and gives you
Vec<String>-style output; raw collection gives you the bytes the child actually wrote, which matters when output isn't UTF-8 line-oriented. - Plain vs.
_or_terminate. The_or_terminatevariants embed the same interrupt → terminate → kill escalation shown in Timeout and Cleanup, so a hung child still gets cleaned up before the helper returns.
Each plain wait helper returns a WaitForCompletionResult (Completed or Timeout); the _or_terminate variants
return WaitForCompletionOrTerminateResult (Completed or TerminatedAfterTimeout). The examples above use
.expect_completed(msg) to panic on the non-completed branch when a timeout would be a hard failure; reach for
.into_completed() instead when the caller should handle both outcomes itself.
When you need output collection and hung-child cleanup together, reach for the combined
wait_for_completion_with_output_or_terminate() (or its _with_raw_output_or_terminate() sibling). It takes the
same LineOutputOptions / RawOutputOptions you would pass to _with_output, plus the three durations you would
pass to _or_terminate, with no extra ceremony.
For trusted, bounded-by-construction output, both LineCollectionOptions and RawCollectionOptions have a
TrustedUnbounded variant that opts out of memory caps.
Line parsing
LineParsingOptions controls how chunks are split into lines and what happens at the limits.
wait_for_line() takes the same LineParsingOptions so its matcher sees the same lines a collector would. Three
fields:
max_line_length(default 16 KB) caps a single line. It must be greater than zero; passNumBytes::MAXexplicitly to opt into unbounded line parsing on a trusted source.LineOverflowBehaviordecides what happens past the cap:DropAdditionalDatadiscards bytes until the next newline (default), andEmitAdditionalAsNewLinessplits the long line into successive emitted lines so no data is lost.buffer_compaction_threshold(defaultNone) is a steady-state memory knob. The parser's two internal buffers retain capacity for whichever was the largest line they have ever held; settingSome(n)forces buffers abovento be released between lines. Useful with mostly-small lines and occasional outliers (especially underNumBytes::MAX), counterproductive when typical lines are already near the threshold.
When chunks drop under best_effort_delivery(), line parsers conservatively discard the in-progress partial line and
resynchronize at the next newline rather than splicing across the gap. Callers should not assume that a missing
newline after a gap means "the next line continues the previous one"; treat each post-gap line as fresh.
Manual process control
When the wait-and-cleanup helpers do not fit, drive shutdown by hand: send_interrupt_signal(),
send_terminate_signal(), terminate() (escalation up to kill), and kill() (immediate). Each returns a typed
TerminationError describing which step failed. The signal-send methods are idempotent against a child that has
already exited: they reap the status and return Ok(()) without sending a signal at a possibly-recycled PID.
seal_stdout_replay(), seal_stderr_replay(), and seal_output_replay() end replay retention on one stream or
both.
Process management
Process naming
A process name is required before stdout/stderr can be configured. The name appears in error messages and tracing spans, and the library refuses to spawn anonymous processes because anonymous failures are hard to debug.
For production workflows, prefer a stable explicit label ("api-worker", format!("agent-{id}")) so logs and
metrics group cleanly. The AutoName variants are convenient for ad-hoc and test code, but the variants that
include arguments, environment, or current directory will surface those values in error messages, so only use them
when those values are safe to log.
Canonical naming forms:
- Explicit label:
.name("api-worker")or.name(format!("agent-{id}")). The recommended default. - Safe automatic:
.name(AutoName::program_only()). Uses just the program name; no arguments, environment, or path are leaked into errors. - Detailed automatic:
.name(AutoName::program_with_args()),.name(AutoName::program_with_env_and_args()),.name(AutoName::full())(cwd + env + program + args), or.name(AutoName::Debug)for the fullCommanddebug representation. Useful in tests; check the redaction story before using in production. - Custom automatic:
.name(AutoNameSettings::builder()...build())to opt into a specific subset of fields.
use Duration;
use Command;
use *;
async
Automatic termination on drop
A live ProcessHandle that is dropped without a successful terminal wait, terminate(), or kill() runs best-effort
cleanup and then panics. The panic is deliberate: a silent leak of a child process is much harder to detect than a
loud panic in tests, and most "I forgot to wait" bugs would otherwise survive into production.
For long-lived processes that follow a service's own lifecycle, opting in to .terminate_on_drop(...) swaps the panic
for an async termination attempt during drop:
use Duration;
use Command;
use *;
async
terminate_on_drop requires a multithreaded Tokio runtime, because drop runs the termination attempt on Tokio's
blocking pool. If the attempt itself fails, the inner handle still falls back to the panic path so the failure is not
silent. Under #[tokio::test], this means opting in to the multi-thread flavor:
async
When the child genuinely needs to outlive the handle (e.g. a daemon you spawned intentionally), call
must_not_be_terminated(). That suppresses both the kill and the panic on drop. The library still drops its owned
stdio, however, so the child loses access to its piped stdin and the parent loses access to its stdout/stderr at the
same moment.
To keep the child and its Stdin/Stdout/Stderr together, use ProcessHandle::into_inner() to take ownership of
all four.
Platform support
The termination escalation is per-platform but follows the same idea: try the gentle signal first (so the child can
flush, close sockets, persist state), then a stronger signal, then unconditional kill. Pick the timeouts you pass to
terminate() and the _or_terminate helpers based on how long your child legitimately needs at each step. Each step
targets the child's process group rather than its PID alone, so grandchildren are signaled with the leader; see
Subprocess trees for the spawn-time setup that makes this work.
- Linux/macOS:
SIGINT→SIGTERM→SIGKILL, all delivered to the child's process group viakillpg. - Windows:
CTRL_BREAK_EVENTto the console process group →TerminateProcesson the leader as the kill fallback.
The Windows path uses CTRL_BREAK_EVENT rather than CTRL_C_EVENT because Windows does not allow targeting
CTRL_C_EVENT at a nonzero child process group. A child that only handles Ctrl+C will not respond to either signal,
and needs its own graceful shutdown channel (stdin, IPC, or a command protocol).
Signal sends (send_interrupt_signal, send_terminate_signal, and the equivalent steps inside terminate()) check
for an already-exited child first and return Ok(()) without sending a signal in that case, so calling them after the
child has died is harmless rather than racy.
Subprocess trees
Every spawned child is set up as the leader of its own process group, so termination signals reach the whole subtree
rather than only the child the library spawned. If your child fork-execs further processes (an npm start that
launches node, a shell wrapper that launches the real binary, a build tool that fans out workers), the grandchildren
inherit the group and are signaled along with the leader by send_interrupt_signal(), send_terminate_signal(),
terminate(), and kill().
- Linux/macOS: the child is spawned with
process_group(0), making its PID equal to its PGID.SIGINT,SIGTERM, andSIGKILLare delivered withkillpg, which targets the whole group. - Windows: the child is spawned with
CREATE_NEW_PROCESS_GROUP.CTRL_BREAK_EVENTis delivered to that console process group viaGenerateConsoleCtrlEvent. The forceful-kill fallback (TerminateProcess) still targets the leader, so a graceful interrupt window is the right place to let cooperative children unwind.
A grandchild that explicitly detaches itself (setsid/setpgid to leave the group, double-fork into init, or its own
job-object on Windows) is by definition outside the group and will not be signaled. That is correct: such children
have asked to outlive their parent. If you do not own the leaf, and it leaks descendants of its own, run it under a
small reaper (tini is the usual choice) so the descendants have a designated owner.
MSRV
MSRV is 1.89.0.
Contributing
Contributions are welcome!
- Create a feature branch.
- Run
just verifyon your changes. - Don't forget to add a CHANGELOG entry under the
# [Unreleased]section. - Open a pull request describing the behavior change and any relevant README updates.
License
Licensed under either of:
- Apache License, Version 2.0 (LICENSE-APACHE)
- MIT License (LICENSE-MIT)
at your option.