Expand description
§tokio-process-tools
A correctness-focused async subprocess library for Tokio. It wraps tokio::process::Command and exposes explicit
controls over resource limits, output consumption, process termination, and cleanup.
Most subprocess code in the wild leaves a handful of decisions implicit:
- What happens when a log consumer falls behind? Is data dropped or is the child blocked?
- Can a stream consumer reliably receive all child startup output even when attached late?
- How much memory will the system need for stream processing? How much for line processing?
- When and how is the child terminated?
- What happens to the child if the parent panics?
Each of those questions may be answered with a hidden default. Some may be fine in isolation. But in aggregate, system behavior becomes dangerously uncertain. The result can be: silent gaps, leaked children, unbounded buffering.
tokio-process-tools makes every one of those choices a required argument at the process construction call site. There
is no default delivery or replay policy, no default buffer sizes. The cost is a more verbose spawn call, the benefit is
that the choice is visible in code review and to whoever debugs the system at 3am.
§When to use this
Reach for it when you are developing any of:
- a build or development tool that streams live output and keeps a bounded tail for failure reports,
- an integration test that starts a real service and waits until it is ready,
- an orchestration layer that must not leak children after failures or timeouts,
Or when you need any of:
- one stream consumed by more than one subsystem (e.g. logging plus readiness checks),
- bounded memory in the face of untrusted or unknown output volume.
If tokio::process::Command already gives you enough control, this crate may add too much ceremony.
§Core model
The library has three layers, and most of the configuration in this README is about the relationship between them:
- A
Processis a spawn configuration.Process::new(command)starts the chain,.spawn()returns aProcessHandle. - An output stream is the buffered pipeline that sits between a child’s stdout (or stderr) and your code. Each stream is configured at spawn time (see Choosing stream settings) and emits raw chunks, which consumers parse on the fly.
- A consumer is anything that reads from a stream: an inspector that runs a callback, a collector that retains
output for later, a
wait_for_line()that resolves on a match. Consumers are implemented through a visitor pattern, allowing for custom extensions to be written.
Consumers attach after spawn, so anything the child writes before a consumer is in place is gone unless a replay policy retains it for late arrivals.
A ProcessHandle is “armed” until it is successfully waited, terminated, killed, or explicitly excused with
must_not_be_terminated(). Dropping an armed handle without one of those calls runs best-effort cleanup and then
panics. Better to be loud than to silently leak children.
See Automatic termination on drop for the opt-out.
§Quickstart
Add to your Cargo.toml:
[dependencies]
tokio-process-tools = "0.9.0"
tokio = { version = "1", features = ["macros", "process", "rt-multi-thread"] }The interactive-stdin example further down uses tokio::io::AsyncWriteExt, which requires the io-util Tokio feature
in your own crate.
The minimum viable spawn: run a command to completion with a deadline and check its exit status. stream.discard()
routes the child’s stdout and stderr to /dev/null at the OS level, so no pipe is allocated and no reader task runs in
the parent. Use this when you only care about whether the process succeeded, not what it wrote.
use std::time::Duration;
use tokio::process::Command;
use tokio_process_tools::*;
#[tokio::main]
async fn main() {
let mut process = Process::new(Command::new("ls"))
.name(AutoName::program_only())
.stdout_and_stderr(|stream| stream.discard())
.spawn()
.expect("failed to spawn command");
let status = process
.wait_for_completion(Duration::from_secs(30))
.await
.unwrap()
.expect_completed("process should complete");
println!("exit status: {status:?}");
}A discarded stream still implements OutputStream (so process.stdout() and process.stderr() still return
something), but it does not implement TrySubscribable: the consumer-attaching APIs (wait_for_completion_with_output,
inspect_lines, collect_lines, etc.) are not in scope on a handle whose stream is discarded. If you later decide you
do want to read the output, swap discard() for one of the streaming chains described in the next section.
Code throughout this README uses use tokio_process_tools::*; to keep examples short; prefer named imports in
application code. Byte sizes use the NumBytes type, and the NumBytesExt trait (re-exported at the crate root) lets
you write 1.megabytes(), 16.kilobytes(), or 512.bytes() on integer (usize) literals.
§Choosing stream settings
Each stdout/stderr stream is configured along five axes: backend, delivery, replay, collection, and buffering.
Backend: who consumes the stream?
broadcast(): Allows for multiple concurrent consumers, e.g. logging plus a line waiter.single_subscriber(): Exactly one consumer is allowed to be active at a time. A second consumer started while another is still running fails immediately withStreamConsumerError::ActiveConsumer. Reach for this when you only ever read the stream from one place at a time. It turns accidental fanout into a loud error at the point of mistake. Once a consumer is dropped again, a new one can be registered. Just never two at any given time. This implementation may be slightly more performant.discard(): No consumption at all. The matching child stdio slot is set toStdio::null(), so the OS drops the bytes; no pipe is allocated and no reader task runs in the parent. The remaining axes (delivery, replay, buffering) are skipped because they have nothing to act on. Reach for this when only the exit status matters; pair it with a consumed stream on the other slot if you still want one stream’s output (.stdout(|s| s.discard()).stderr(|s| s.broadcast()...)is fine).
Delivery: what happens when a consumer can’t keep up?
reliable_for_active_subscribers(): When an active consumer’s buffer fills, the child is paused (its next write blocks) instead of dropping data. The guarantee is scoped to consumers that are already attached: a consumer that arrives after the child has produced output gets nothing unless you also configure a replay policy. Use when completeness of the stream / definitely receiving all output matters more than keeping the child unblocked.best_effort_delivery(): Slow consumers may observe gaps. Line consumers drop the in-progress partial line and resync at the next newline. Use when latency matters more than completeness. This never blocks the child.
Replay: what does a consumer attached after spawn see?
.no_replay(): Live output only. A consumer that attaches after the child has already printed something will not see it..replay_last_bytes(...)/.replay_last_chunks(...)/.replay_all(): The library retains output so a late consumer can start from history. The first two cap retention..replay_all()is unbounded and should only be used for trusted output volume.
The typical lifecycle for a long-running service is: spawn with a replay policy, attach any consumers you want, then
call seal_output_replay() to release the retained history. The seal applies to future consumers: They start at live
output. Consumers that were already attached when the seal happens are unaffected, and whatever the delivery policy
guaranteed them is still there to read. The per-stream variants seal_stdout_replay() and seal_stderr_replay() cover
the case where one stream’s startup phase ends before the other’s.
Collection: how much memory does the library hold?
Use bounded LineCollectionOptions / RawCollectionOptions for untrusted output. Reserve TrustedUnbounded for
processes whose output volume is known and controlled. The bounded variants take a CollectionOverflowBehavior that
decides what to keep when the cap is reached: DropAdditionalData (default) keeps the first-retained output and
discards anything that arrives after the cap is hit, while DropOldestData keeps the newest output by evicting older
retained data, which gives you a ring-buffer-shaped tail.
Buffering: how big a read, how deep a queue?
Every chain ends with two buffering knobs:
.read_chunk_size(...)is the size of eachread()against the child’s pipe. Smaller values let a consumer see partial results from large bursts sooner and cap memory. Larger values reduce per-burst syscall and per-chunk overhead..max_buffered_chunks(...)is the depth of the in-process queue between the reader task and the consumer. Larger values absorb more bursty output before backpressure or dropping kicks in (depending on the delivery policy). Smaller values cap memory.
Pass DEFAULT_READ_CHUNK_SIZE and DEFAULT_MAX_BUFFERED_CHUNKS unless you have measured a reason to tune them. The
defaults (16 KB and 128 chunks) are tuned for general-purpose streaming. Both knobs must be named at the spawn site:
the spelled-out values keep the choice visible in code review and stop accidental mismatches between sites that all
silently rely on a default.
The async line-aware consumers (inspect_lines_async, collect_lines_async, collect_lines_into_write, and friends
that take an AsyncLineSink) materialize every parsed line as an owned String before invoking the per-line callback.
The synchronous line consumers can pass the line as Cow::Borrowed straight out of the chunk on the fast path, so they
avoid the allocation. Prefer the synchronous flavor when per-line work is non-blocking; reach for the async flavor only
when the per-line callback genuinely needs to .await.
§Consumers
tokio-process-tools provides the following consumers out of the box:
inspect_lines/inspect_chunks(and their_asyncvariants) run a callback per item and discard the data. Reach for these when only the side effect matters: forward totracing, count occurrences, react to a milestone.collect_lines/collect_chunks(and_asyncvariants) retain output in memory for later inspection. Reach for these when a test or caller wants to assert on what the child wrote after it has finished andwait_for_completion_with_outputdoesn’t fit.collect_lines_into_write/collect_chunks_into_write(plus_mappedvariants) forward output to an async writer (a file, a TCP sink, anythingAsyncWrite).WriteCollectionOptionscontrols writer-failure handling andLineWriteModecontrols whether parsed lines get the stripped\nreattached. See Stream output to a sink for an end-to-end example.
See the Custom consumer / stream visitor example for guidance on how to build your own consumers.
§Consumer lifecycle
Each Consumer<S> handle owns the spawned task. Dropping the handle drops the task, which cancels the consumer
immediately. Bind it to a variable, e.g. let _consumer = process.stdout().inspect_line(...), to silence the
unused-result warning (if you don’t intend to interact with the handle). The #[must_use] annotations on every
consumer-creator is the compiler’s reminder of this contract. If your process is long-lived, make sure to store consumer
handles somewhere, so that they are not dropped.
A Consumer exposes three ways to finish. The choice is a trade-off between recovering state (the writer, the
collected output) and not hanging:
wait()lets the consumer drain naturally. It returns when the stream closes (the child exited and any retained replay has been read out) or the visitor returnedNext::Break. Use this when there is a natural endpoint, and you want whatever the consumer produced; this is the path that gives you the writer back from acollect_*_into_writeconsumer.cancel(timeout)is the bounded cooperative-then-forceful shutdown. The consumer task is signaled to stop at the next safe point. If it observes the cancellation and exits beforetimeoutelapses, you getConsumerCancelOutcome::Cancelled(sink)with the sink preserved. Iftimeoutelapses first, the task is aborted, and you getConsumerCancelOutcome::Abortedwith the sink dropped. Pick the timeout based on how long the consumer might legitimately need to flush.abort()is unconditional forceful shutdown. The task’s in-flight future is dropped, which means sinks and writers are not returned (you cannot recover the file handle, the partially-collectedVec, etc.). Reach for this when not getting stuck matters more than recovering state, and you do not want to pay any timeout at all.
§Examples
Patterns past the basic spawn-and-wait shown in the Quickstart.
§Wait and collect output
Like the Quickstart spawn, but the stdout/stderr that appeared during the run is captured and returned. The collector
here is attached inside the wait helper, after the child has already started producing output, which is why the stream
is configured with replay_last_bytes(...). Without that policy, anything the child wrote before the helper attached
the collector would be lost (see Core model). The wait timeout covers both process exit and the final
drain of stdout/stderr.
use std::time::Duration;
use tokio::process::Command;
use tokio_process_tools::*;
#[tokio::main]
async fn main() {
let mut process = Process::new(Command::new("ls"))
.name(AutoName::program_only())
.stdout_and_stderr(|stream| {
stream
.single_subscriber()
.best_effort_delivery()
.replay_last_bytes(1.megabytes())
.read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
.max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
})
.spawn()
.expect("failed to spawn command");
let line_collection_options = LineCollectionOptions::Bounded {
max_bytes: 1.megabytes(),
max_lines: 1024,
overflow_behavior: CollectionOverflowBehavior::DropAdditionalData,
};
let ProcessOutput {
status,
stdout,
stderr,
} = process
.wait_for_completion_with_output(
Duration::from_secs(30),
LineOutputOptions {
line_parsing_options: LineParsingOptions::default(),
stdout_collection_options: line_collection_options,
stderr_collection_options: line_collection_options,
},
)
.await
.unwrap()
.expect_completed("process should complete");
println!("exit status: {status:?}");
println!("stdout lines: {:?}", stdout.lines());
println!("stderr lines: {:?}", stderr.lines());
}Use wait_for_completion_with_raw_output() when the child’s output is not UTF-8 line-oriented: binary blobs (image
conversion, archive tools), framed protocols, or anything where line parsing would corrupt or lose bytes. The shape of
the call is the same; substitute RawOutputOptions for LineOutputOptions, RawCollectionOptions for
LineCollectionOptions, and read the result as CollectedBytes { bytes, truncated } on each stream. No
LineParsingOptions is involved. RawCollectionOptions offers the same Bounded { max_bytes, overflow_behavior }
and TrustedUnbounded variants as the line equivalent.
§Wait for readiness from output
The classic “start a service in an integration test and wait until it logs that it is ready” pattern. wait_for_line()
is itself a stream consumer with a required timeout, so under single_subscriber() it competes with anything else
reading the stream. Reach for broadcast() whenever readiness detection has to coexist with a logger. Pair it with
replay retention so a waiter attached after spawn can still see startup lines, then seal_output_replay() to free that
history once the service is up.
use std::time::Duration;
use tokio::process::Command;
use tokio_process_tools::*;
#[tokio::main]
async fn main() {
let mut process = Process::new(Command::new("my-web-server"))
.name("api")
.stdout_and_stderr(|stream| {
stream
.broadcast()
.reliable_for_active_subscribers()
.replay_last_bytes(1.megabytes())
.read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
.max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
})
.spawn()
.expect("failed to spawn server");
let stdout = process.stdout();
let _logs = stdout.inspect_lines(
|line| {
eprintln!("{line}");
Next::Continue
},
LineParsingOptions::default(),
);
match stdout
.wait_for_line(
Duration::from_secs(30),
|line| line.contains("Server listening on"),
LineParsingOptions::default(),
)
.await
{
Ok(WaitForLineResult::Matched) => {}
Ok(WaitForLineResult::StreamClosed) => panic!("server exited before becoming ready"),
Ok(WaitForLineResult::Timeout) => panic!("server did not become ready in time"),
Err(err) => panic!("failed to read stdout: {err}"),
}
process.seal_output_replay();
let _ = process
.terminate(Duration::from_secs(3), Duration::from_secs(5))
.await;
}§Interactive stdin
Children always have stdin piped (there is no opt-out), but the library only flushes/closes the pipe when you ask.
Calling stdin().close() lets a child observe EOF before any wait helper runs (some programs only finish on EOF, not
on idle). Terminal waits and kill() close any still-open stdin automatically, so the manual close is only needed
when the child must see EOF before the wait.
process.stdin() returns a &mut Stdin, an enum with Open(ChildStdin) and Closed variants. Stdin::as_mut()
projects that into Option<&mut ChildStdin>, returning None once the slot has transitioned to Closed (either
explicitly via stdin().close() or implicitly because a wait helper or kill() has already cleaned it up). The
example below pattern-matches with if let Some(stdin) = process.stdin().as_mut() for that reason. The None branch
has no other failure mode in the stdin-is-always-piped contract.
use std::time::Duration;
use tokio::io::AsyncWriteExt;
use tokio::process::Command;
use tokio_process_tools::*;
#[tokio::main]
async fn main() {
let line_collection = LineCollectionOptions::Bounded {
max_bytes: 1.megabytes(),
max_lines: 1024,
overflow_behavior: CollectionOverflowBehavior::DropAdditionalData,
};
let mut process = Process::new(Command::new("cat"))
.name(AutoName::program_only())
.stdout_and_stderr(|stream| {
stream
.single_subscriber()
.best_effort_delivery()
.replay_last_bytes(1.megabytes())
.read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
.max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
})
.spawn()
.expect("failed to spawn command");
if let Some(stdin) = process.stdin().as_mut() {
stdin.write_all(b"hello\n").await.unwrap();
stdin.write_all(b"world\n").await.unwrap();
stdin.flush().await.unwrap();
}
process.stdin().close();
let output = process
.wait_for_completion_with_output(
Duration::from_secs(30),
LineOutputOptions {
line_parsing_options: LineParsingOptions::default(),
stdout_collection_options: line_collection,
stderr_collection_options: line_collection,
},
)
.await
.unwrap()
.expect_completed("process should complete");
println!("stdout lines: {:?}", output.stdout.lines());
}§Stream output to a sink
When the child will produce more output than is reasonable to hold in memory (build tools, long-running servers in CI
logs, or anything you want on disk rather than in Vec<String>), point a collector at an AsyncWrite sink instead.
The example below tees stdout into a file. Parsed lines are written one per call, with LineWriteMode::AppendLf
re-attaching the trailing newline that line parsing strips. WriteCollectionOptions::log_and_continue() keeps
collection running if the file write ever fails (disk full, broken pipe to a piped writer, etc.); use fail_fast()
instead when a sink failure should stop the collection.
The collector is yours to drive. After the child exits the stream closes, the visitor drains, and wait() returns
the file handle so you can flush or close it deterministically.
This example needs the fs Tokio feature in your own crate, in addition to the features listed in the
Quickstart.
use std::time::Duration;
use tokio::fs::File;
use tokio::process::Command;
use tokio_process_tools::*;
#[tokio::main]
async fn main() {
let mut process = Process::new(Command::new("my-noisy-tool"))
.name(AutoName::program_only())
.stdout_and_stderr(|stream| {
stream
.broadcast()
.reliable_for_active_subscribers()
.no_replay()
.read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
.max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
})
.spawn()
.expect("failed to spawn command");
let log_file = File::create("/tmp/output.log")
.await
.expect("failed to open log file");
let log_collector = process.stdout().collect_lines_into_write(
log_file,
LineParsingOptions::default(),
LineWriteMode::AppendLf,
WriteCollectionOptions::log_and_continue(),
);
let _status = process
.wait_for_completion(Duration::from_secs(60))
.await
.unwrap()
.expect_completed("process should complete");
// Stream is closed now that the child has exited; the collector drains. The outer
// `Result` reflects consumer-infrastructure outcomes (task join, stream read), the
// inner one reflects whether the writer accepted every byte; with
// `log_and_continue()` write failures are logged so the inner result is `Ok` here.
let _log_file = log_collector.wait().await.unwrap().unwrap();
}§Timeout and Cleanup
For processes that may hang, wait_for_completion_or_terminate() rolls the “interrupt → terminate → kill”
escalation into the wait so you don’t have to sequence cleanup yourself. The three durations correspond to: how long
to wait for a clean exit, how long to give the child after the interrupt signal, and how long to give it after the
terminate signal before falling back to kill().
use std::time::Duration;
use tokio::process::Command;
use tokio_process_tools::*;
#[tokio::main]
async fn main() {
let mut process = Process::new(Command::new("potentially-hanging-process"))
.name(AutoName::program_only())
.stdout_and_stderr(|stream| {
stream
.broadcast()
.best_effort_delivery()
.no_replay()
.read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
.max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
})
.spawn()
.expect("failed to spawn command");
match process
.wait_for_completion_or_terminate(WaitForCompletionOrTerminateOptions {
wait_timeout: Duration::from_secs(30),
interrupt_timeout: Duration::from_secs(3),
terminate_timeout: Duration::from_secs(5),
})
.await
{
Ok(WaitForCompletionOrTerminateResult::Completed(status)) => {
println!("completed with status: {status:?}");
}
Ok(WaitForCompletionOrTerminateResult::TerminatedAfterTimeout {
result,
timeout,
}) => {
println!("terminated after waiting {timeout:?}; status: {result:?}");
}
Err(err) => eprintln!("wait or cleanup failed: {err}"),
}
}§Custom consumer / stream visitor
When the built-in inspect_* and collect_* consumers don’t fit, implement StreamVisitor (sync) or
AsyncStreamVisitor (async) directly and attach it with consume_with / consume_with_async. The cases that pay
for a custom visitor are: maintaining state across chunks, reacting explicitly to gap notifications, or producing an
output type the built-ins don’t expose. If all you need is a side effect per chunk or per line, reach for
inspect_chunks / inspect_lines; the closure form is simpler.
Pick StreamVisitor when on_chunk can return without .await-ing (counters, in-memory state machines, channel
try_send, simple synchronous I/O). Pick AsyncStreamVisitor when chunk handling needs to await (network sends,
async writers, channel send with backpressure). Both traits require Send + 'static, and both return their final
value via into_output(self) at the end of the consumer task’s life.
The example below tracks how many bytes and chunks the stream produced, plus how many gap events the backend reported.
on_gap is invoked when one or more chunks were dropped between the previous and next observed chunk (only possible
under best_effort_delivery() overflow), and visitors that splice bytes across chunks should treat it as a reset
signal.
use std::time::Duration;
use tokio::process::Command;
use tokio_process_tools::*;
#[derive(Debug, Default)]
struct ChunkStats {
bytes: usize,
chunks: usize,
gaps: usize,
}
impl StreamVisitor for ChunkStats {
type Output = ChunkStats;
fn on_chunk(&mut self, chunk: Chunk) -> Next {
self.bytes += chunk.as_ref().len();
self.chunks += 1;
Next::Continue
}
fn on_gap(&mut self) {
self.gaps += 1;
}
fn into_output(self) -> ChunkStats {
self
}
}
#[tokio::main]
async fn main() {
let mut process = Process::new(Command::new("ls"))
.name(AutoName::program_only())
.stdout_and_stderr(|stream| {
stream
.single_subscriber()
.best_effort_delivery()
.replay_last_bytes(1.megabytes())
.read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
.max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
})
.spawn()
.expect("failed to spawn command");
let counter = process
.stdout()
.consume_with(ChunkStats::default())
.expect("no other consumer is attached yet");
let _status = process
.wait_for_completion(Duration::from_secs(30))
.await
.unwrap()
.expect_completed("process should complete");
let stats = counter.wait().await.unwrap();
println!(
"stdout: {} bytes across {} chunks ({} gaps)",
stats.bytes, stats.chunks, stats.gaps,
);
}A few lifecycle quirks worth knowing. on_gap() is synchronous in both traits because gap notifications carry no
data to await on. on_eof() is skipped when on_chunk returned Next::Break, on the assumption that an
early-break visitor already has its result; cancellation and abort skip on_eof for the same reason. Whichever exit
path was taken, into_output always runs, so Consumer::wait() and cancel() always have a value to yield.
§Further APIs
The Quickstart and Examples cover the common path. The rest of the surface is small but purposeful. Here is when you would reach for each piece.
§Wait helper variants
wait_for_completion_with_output() and friends come in four combinations: line vs. raw bytes, and plain wait vs.
wait-or-terminate. Pick the variant that matches what you need:
- Line vs. raw. Line collection parses on the fly and gives you
Vec<String>-style output; raw collection gives you the bytes the child actually wrote, which matters when output isn’t UTF-8 line-oriented. - Plain vs.
_or_terminate. The_or_terminatevariants embed the same interrupt → terminate → kill escalation shown in Timeout and Cleanup, so a hung child still gets cleaned up before the helper returns.
Each plain wait helper returns a WaitForCompletionResult (Completed or Timeout); the _or_terminate variants
return WaitForCompletionOrTerminateResult (Completed or TerminatedAfterTimeout). The examples above use
.expect_completed(msg) to panic on the non-completed branch when a timeout would be a hard failure; reach for
.into_completed() instead when the caller should handle both outcomes itself.
When you need output collection and hung-child cleanup together, reach for the combined
wait_for_completion_with_output_or_terminate() (or its _with_raw_output_or_terminate() sibling). It takes the
same LineOutputOptions / RawOutputOptions you would pass to _with_output, plus the three durations you would
pass to _or_terminate, with no extra ceremony.
For trusted, bounded-by-construction output, both LineCollectionOptions and RawCollectionOptions have a
TrustedUnbounded variant that opts out of memory caps.
§Line parsing
LineParsingOptions controls how chunks are split into lines and what happens at the limits.
wait_for_line() takes the same LineParsingOptions so its matcher sees the same lines a collector would. Three
fields:
max_line_length(default 16 KB) caps a single line. It must be greater than zero; passNumBytes::MAXexplicitly to opt into unbounded line parsing on a trusted source.LineOverflowBehaviordecides what happens past the cap:DropAdditionalDatadiscards bytes until the next newline (default), andEmitAdditionalAsNewLinessplits the long line into successive emitted lines so no data is lost.buffer_compaction_threshold(defaultNone) is a steady-state memory knob. The parser’s two internal buffers retain capacity for whichever was the largest line they have ever held; settingSome(n)forces buffers abovento be released between lines. Useful with mostly-small lines and occasional outliers (especially underNumBytes::MAX), counterproductive when typical lines are already near the threshold.
When chunks drop under best_effort_delivery(), line parsers conservatively discard the in-progress partial line and
resynchronize at the next newline rather than splicing across the gap. Callers should not assume that a missing
newline after a gap means “the next line continues the previous one”; treat each post-gap line as fresh.
§Manual process control
When the wait-and-cleanup helpers do not fit, drive shutdown by hand: send_interrupt_signal(),
send_terminate_signal(), terminate() (escalation up to kill), and kill() (immediate). Each returns a typed
TerminationError describing which step failed. The signal-send methods are idempotent against a child that has
already exited: they reap the status and return Ok(()) without sending a signal at a possibly-recycled PID.
seal_stdout_replay(), seal_stderr_replay(), and seal_output_replay() end replay retention on one stream or
both.
§Process management
§Process naming
A process name is required before stdout/stderr can be configured. The name appears in error messages and tracing spans, and the library refuses to spawn anonymous processes because anonymous failures are hard to debug.
For production workflows, prefer a stable explicit label ("api-worker", format!("agent-{id}")) so logs and
metrics group cleanly. The AutoName variants are convenient for ad-hoc and test code, but the variants that
include arguments, environment, or current directory will surface those values in error messages, so only use them
when those values are safe to log.
Canonical naming forms:
- Explicit label:
.name("api-worker")or.name(format!("agent-{id}")). The recommended default. - Safe automatic:
.name(AutoName::program_only()). Uses just the program name; no arguments, environment, or path are leaked into errors. - Detailed automatic:
.name(AutoName::program_with_args()),.name(AutoName::program_with_env_and_args()),.name(AutoName::full())(cwd + env + program + args), or.name(AutoName::Debug)for the fullCommanddebug representation. Useful in tests; check the redaction story before using in production. - Custom automatic:
.name(AutoNameSettings::builder()...build())to opt into a specific subset of fields.
use std::time::Duration;
use tokio::process::Command;
use tokio_process_tools::*;
#[tokio::main]
async fn main() {
let id = 42;
let mut process = Process::new(Command::new("true"))
.name(format!("agent-{id}"))
.stdout_and_stderr(|stream| {
stream
.single_subscriber()
.best_effort_delivery()
.no_replay()
.read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
.max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
})
.spawn()
.unwrap();
process
.wait_for_completion(Duration::from_secs(30))
.await
.unwrap()
.expect_completed("process should complete");
}§Automatic termination on drop
A live ProcessHandle that is dropped without a successful terminal wait, terminate(), or kill() runs best-effort
cleanup and then panics. The panic is deliberate: a silent leak of a child process is much harder to detect than a
loud panic in tests, and most “I forgot to wait” bugs would otherwise survive into production.
For long-lived processes that follow a service’s own lifecycle, opting in to .terminate_on_drop(...) swaps the panic
for an async termination attempt during drop:
use std::time::Duration;
use tokio::process::Command;
use tokio_process_tools::*;
#[tokio::main]
async fn main() {
let _process = Process::new(Command::new("some-command"))
.name(AutoName::program_only())
.stdout_and_stderr(|stream| {
stream
.broadcast()
.best_effort_delivery()
.no_replay()
.read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
.max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
})
.spawn()
.unwrap()
.terminate_on_drop(Duration::from_secs(3), Duration::from_secs(5));
}terminate_on_drop requires a multithreaded Tokio runtime, because drop runs the termination attempt on Tokio’s
blocking pool. If the attempt itself fails, the inner handle still falls back to the panic path so the failure is not
silent. Under #[tokio::test], this means opting in to the multi-thread flavor:
#[tokio::test(flavor = "multi_thread")]
async fn test() {
// ...
}When the child genuinely needs to outlive the handle (e.g. a daemon you spawned intentionally), call
must_not_be_terminated(). That suppresses both the kill and the panic on drop. The library still drops its owned
stdio, however, so the child loses access to its piped stdin and the parent loses access to its stdout/stderr at the
same moment.
To keep the child and its Stdin/Stdout/Stderr together, use ProcessHandle::into_inner() to take ownership of
all four.
§Platform support
The termination escalation is per-platform but follows the same idea: try the gentle signal first (so the child can
flush, close sockets, persist state), then a stronger signal, then unconditional kill. Pick the timeouts you pass to
terminate() and the _or_terminate helpers based on how long your child legitimately needs at each step. Each step
targets the child’s process group rather than its PID alone, so grandchildren are signaled with the leader; see
Subprocess trees for the spawn-time setup that makes this work.
- Linux/macOS:
SIGINT→SIGTERM→SIGKILL, all delivered to the child’s process group viakillpg. - Windows:
CTRL_BREAK_EVENTto the console process group →TerminateProcesson the leader as the kill fallback.
The Windows path uses CTRL_BREAK_EVENT rather than CTRL_C_EVENT because Windows does not allow targeting
CTRL_C_EVENT at a nonzero child process group. A child that only handles Ctrl+C will not respond to either signal,
and needs its own graceful shutdown channel (stdin, IPC, or a command protocol).
Signal sends (send_interrupt_signal, send_terminate_signal, and the equivalent steps inside terminate()) check
for an already-exited child first and return Ok(()) without sending a signal in that case, so calling them after the
child has died is harmless rather than racy.
§Subprocess trees
Every spawned child is set up as the leader of its own process group, so termination signals reach the whole subtree
rather than only the child the library spawned. If your child fork-execs further processes (an npm start that
launches node, a shell wrapper that launches the real binary, a build tool that fans out workers), the grandchildren
inherit the group and are signaled along with the leader by send_interrupt_signal(), send_terminate_signal(),
terminate(), and kill().
- Linux/macOS: the child is spawned with
process_group(0), making its PID equal to its PGID.SIGINT,SIGTERM, andSIGKILLare delivered withkillpg, which targets the whole group. - Windows: the child is spawned with
CREATE_NEW_PROCESS_GROUP.CTRL_BREAK_EVENTis delivered to that console process group viaGenerateConsoleCtrlEvent. The forceful-kill fallback (TerminateProcess) still targets the leader, so a graceful interrupt window is the right place to let cooperative children unwind.
A grandchild that explicitly detaches itself (setsid/setpgid to leave the group, double-fork into init, or its own
job-object on Windows) is by definition outside the group and will not be signaled. That is correct: such children
have asked to outlive their parent. If you do not own the leaf, and it leaks descendants of its own, run it under a
small reaper (tini is the usual choice) so the descendants have a designated owner.
§MSRV
MSRV is 1.89.0.
§Contributing
Contributions are welcome!
- Create a feature branch.
- Run
just verifyon your changes. - Don’t forget to add a CHANGELOG entry under the
# [Unreleased]section. - Open a pull request describing the behavior change and any relevant README updates.
§License
Licensed under either of:
- Apache License, Version 2.0 (LICENSE-APACHE)
- MIT License (LICENSE-MIT)
at your option.
Structs§
- Auto
Name Settings - Controls in detail which parts of the command are automatically captured as the process name.
- Best
Effort Delivery - Best-effort stream delivery marker.
- Broadcast
Output Stream - The output stream from a process using a multi-consumer broadcast backend.
- Chunk
- A “chunk” is an arbitrarily sized byte slice read from the underlying stream.
The slices’ length is at max of the previously configured maximum
chunk_size. - Collect
Line Sink LineSinkholding the user closure and a sink;on_linecalls the closure with the line and a&mutborrow of the sink. Compose withLineAdapterto drivecollect_lines, or to build your own custom collect-lines consumer outside the built-in factory methods.- Collect
Line Sink Async AsyncLineSinkholding the user collector and a sink. Compose withLineAdapter(itsAsyncStreamVisitorimpl is selected automatically when the inner sink is anAsyncLineSink) to drivecollect_lines_async.- Collected
Bytes - Raw bytes collected from an output stream.
- Collected
Lines - Parsed lines collected from an output stream.
- Consumer
- A handle for a tokio task that consumes a stream by driving a visitor over its events.
- Discarded
Output Stream - Marker stream for a stdio slot configured with
std::process::Stdio::null(). - Discarded
Stream Config - Configuration produced by
ProcessStreamBuilder::discard. The matching child stdio slot is set toStdio::null(), so the OS discards the bytes; no pipe is allocated and no reader task runs in the parent. - Inspect
Line Sink LineSinkwrapping a per-line closure. Compose withLineAdapterto driveinspect_lines, or to build your own custom inspect-lines consumer outside the built-in factory methods.- Inspect
Line Sink Async AsyncLineSinkwrapping a per-line async closure. Compose withLineAdapter(itsAsyncStreamVisitorimpl is selected automatically when the inner sink is anAsyncLineSink) to driveinspect_lines_async. ThePhantomData<fn() -> Fut>carries the future’s type onto the struct so callers never nameFutexplicitly.- Line
Adapter - Adapter that drives
LineParserover chunk events and dispatches every emitted line to the innerLineSink(sync) orAsyncLineSink(async). - Line
Output Options - Options for line output collection from stdout and stderr.
- Line
Parser - Stateful parser for turning arbitrary byte chunks into lines.
- Line
Parsing Options - Configuration options for parsing lines from a stream.
- NoReplay
- Marker for streams without replay support.
- NumBytes
- A wrapper type representing a number of bytes.
- Process
- Typestate builder for configuring and spawning a process.
- Process
Handle - A handle to a spawned process with captured stdout/stderr streams.
- Process
Output - Full output of a process that terminated.
- Process
Stream Builder - Builder for selecting the output stream backend for one process stream.
- RawOutput
Options - Options for raw byte output collection from stdout and stderr.
- Reliable
Delivery - Reliable active-subscriber stream delivery marker.
- Replay
Enabled - Marker for streams with replay support.
- Single
Subscriber Output Stream - The output stream from a process. Either representing stdout or stderr.
- Sink
Write Error - Details about a failed async write into a collector sink.
- Stream
Config - Shared output stream configuration for all stream backends.
- Stream
Config Builder - Initial builder stage that requires selecting delivery behavior.
- Stream
Read Error - Error emitted when an output stream cannot be read to completion.
- Terminate
OnDrop - A wrapper that automatically terminates a process when dropped.
- Termination
Attempt Error - A failed operation recorded while attempting to terminate a process.
- Wait
ForCompletion OrTerminate Options - Options for waiting until a process exits, terminating it if waiting fails.
- Wait
ForLine Sink LineSinkthat breaks the moment a predicate accepts a line and remembers whether it has matched yet. Compose withLineAdapterto drivewait_for_line, or to build your own custom predicate-driven consumer outside the built-in factory methods.- Write
Collection Options - Options for forwarding collected stream output into an async writer.
- Write
Line Sink AsyncLineSinkthat maps each parsed line throughmapper, writes the result viawriter, and routes failures througherror_handler. Compose withLineAdapter(itsAsyncStreamVisitorimpl is selected automatically when the inner sink is anAsyncLineSink) to drivecollect_lines_into_writeand friends, or to build your own custom write-lines consumer outside the built-in factory methods.
Enums§
- Auto
Name - Controls how the process name is automatically generated when not explicitly provided.
- Collection
Overflow Behavior - Controls which output is retained once a bounded in-memory collection reaches its limit.
- Consumer
Cancel Outcome - The result of
Consumer::cancel. - Consumer
Error - Errors that the
Consumerinfrastructure itself can raise while driving its stream. - Delivery
Guarantee - Runtime delivery behavior used by typed stream modes.
- Line
Collection Options - Options for collecting parsed output lines into memory.
- Line
Overflow Behavior - What should happen when a line is too long?
- Line
Write Mode - Controls how line-based write helpers delimit successive lines.
- Next
- Control flag to indicate whether processing should continue or break.
- Process
Name - Specifies how a process should be named.
- RawCollection
Options - Options for collecting raw output bytes into memory.
- Replay
Retention - Replay history retained by replay-enabled streams.
- Running
State - Represents the running state of a process.
- Sink
Write Error Action - Action to take after an async writer sink rejects collected output.
- Sink
Write Operation - The write operation that failed while forwarding collected output into an async writer.
- Spawn
Error - Errors that can occur when spawning a process.
- Stdin
- Represents the
stdinstream of a child process. - Stream
Consumer Error - Errors that can occur when creating a stream consumer.
- Stream
Event - Event emitted by an output stream backend.
- Termination
Attempt Operation - Termination operation that failed.
- Termination
Attempt Phase - Termination phase where an attempt error was recorded.
- Termination
Error - Errors that can occur when terminating a process.
- Wait
Error - Errors that can occur when waiting for process operations.
- Wait
ForCompletion OrTerminate Result - Result of waiting for a process to complete, terminating it if the wait times out.
- Wait
ForCompletion Result - Result of waiting for a process to complete within an explicit timeout.
- Wait
ForLine Result - Result of waiting for an output line matching a predicate.
- Wait
OrTerminate Error - Errors that can occur when waiting for a process with automatic termination on failure.
- Wait
With Output Error - Errors that can occur when waiting for a process while collecting its output, with or without automatic termination on timeout.
Constants§
- DEFAULT_
MAX_ BUFFERED_ CHUNKS - Default maximum buffered chunks for stdout and stderr streams. 128 slots.
- DEFAULT_
MAX_ LINE_ LENGTH - Default maximum line length used by
LineParsingOptions::default. 16 kilobytes. - DEFAULT_
READ_ CHUNK_ SIZE - Default chunk size read from the source stream. 16 kilobytes.
Traits§
- Async
Chunk Collector - An async collector for raw output chunks.
- Async
Line Collector - An async collector for parsed output lines.
- Async
Line Sink - Async per-line action selected by the
AsyncStreamVisitorimpl ofLineAdapter. - Async
Stream Visitor - An asynchronous visitor that observes stream events and produces a final value.
- Delivery
- Marker trait implemented by supported stream delivery marker types.
- Line
Sink - Per-line action selected by the
StreamVisitorimpl ofLineAdapter. - NumBytes
Ext - Extension trait providing convenience constructors for
NumBytes. - Output
Stream - We support the following implementations:
- Process
Stream Config - Marker trait for process stream builder configurations.
- Replay
- Marker trait implemented by supported stream replay marker types.
- Sink
- A trait for types that can act as sinks for collected stream data.
- Sink
Write Error Handler - Handles async writer sink failures observed by writer collectors.
- Stream
Visitor - A synchronous visitor that observes stream events and produces a final value.
- Subscription
- Stream event subscription used by built-in consumers.
- TrySubscribable
- Output stream backend that can reject consumer subscriptions.