Skip to main content

Crate tokio_process_tools

Crate tokio_process_tools 

Source
Expand description

§tokio-process-tools

A correctness-focused async subprocess library for Tokio. It wraps tokio::process::Command and exposes explicit controls over resource limits, output consumption, process termination, and cleanup.

Most subprocess code in the wild leaves a handful of decisions implicit:

  • What happens when a log consumer falls behind? Is data dropped or is the child blocked?
  • Can a stream consumer reliably receive all child startup output even when attached late?
  • How much memory will the system need for stream processing? How much for line processing?
  • When and how is the child terminated?
  • What happens to the child if the parent panics?

Each of those questions may be answered with a hidden default. Some may be fine in isolation. But in aggregate, system behavior becomes dangerously uncertain. The result can be: silent gaps, leaked children, unbounded buffering.

tokio-process-tools makes every one of those choices a required argument at the process construction call site. There is no default delivery or replay policy, no default buffer sizes. The cost is a more verbose spawn call, the benefit is that the choice is visible in code review and to whoever debugs the system at 3am.

§When to use this

Reach for it when you are developing any of:

  • a build or development tool that streams live output and keeps a bounded tail for failure reports,
  • an integration test that starts a real service and waits until it is ready,
  • an orchestration layer that must not leak children after failures or timeouts,

Or when you need any of:

  • one stream consumed by more than one subsystem (e.g. logging plus readiness checks),
  • bounded memory in the face of untrusted or unknown output volume.

If tokio::process::Command already gives you enough control, this crate may add too much ceremony.

§Core model

The library has three layers, and most of the configuration in this README is about the relationship between them:

  • A Process is a spawn configuration. Process::new(command) starts the chain, .spawn() returns a ProcessHandle.
  • An output stream is the buffered pipeline that sits between a child’s stdout (or stderr) and your code. Each stream is configured at spawn time (see Choosing stream settings) and emits raw chunks, which consumers parse on the fly.
  • A consumer is anything that reads from a stream: an inspector that runs a callback, a collector that retains output for later, a wait_for_line() that resolves on a match. Consumers are implemented through a visitor pattern, allowing for custom extensions to be written.

Consumers attach after spawn, so anything the child writes before a consumer is in place is gone unless a replay policy retains it for late arrivals.

A ProcessHandle is “armed” until it is successfully waited, terminated, killed, or explicitly excused with must_not_be_terminated(). Dropping an armed handle without one of those calls runs best-effort cleanup and then panics. Better to be loud than to silently leak children. See Automatic termination on drop for the opt-out.

§Quickstart

Add to your Cargo.toml:

[dependencies]
tokio-process-tools = "0.9.0"
tokio = { version = "1", features = ["macros", "process", "rt-multi-thread"] }

The interactive-stdin example further down uses tokio::io::AsyncWriteExt, which requires the io-util Tokio feature in your own crate.

The minimum viable spawn: run a command to completion with a deadline and check its exit status. stream.discard() routes the child’s stdout and stderr to /dev/null at the OS level, so no pipe is allocated and no reader task runs in the parent. Use this when you only care about whether the process succeeded, not what it wrote.

use std::time::Duration;
use tokio::process::Command;
use tokio_process_tools::*;

#[tokio::main]
async fn main() {
    let mut process = Process::new(Command::new("ls"))
        .name(AutoName::program_only())
        .stdout_and_stderr(|stream| stream.discard())
        .spawn()
        .expect("failed to spawn command");

    let status = process
        .wait_for_completion(Duration::from_secs(30))
        .await
        .unwrap()
        .expect_completed("process should complete");

    println!("exit status: {status:?}");
}

A discarded stream still implements OutputStream (so process.stdout() and process.stderr() still return something), but it does not implement TrySubscribable: the consumer-attaching APIs (wait_for_completion_with_output, inspect_lines, collect_lines, etc.) are not in scope on a handle whose stream is discarded. If you later decide you do want to read the output, swap discard() for one of the streaming chains described in the next section.

Code throughout this README uses use tokio_process_tools::*; to keep examples short; prefer named imports in application code. Byte sizes use the NumBytes type, and the NumBytesExt trait (re-exported at the crate root) lets you write 1.megabytes(), 16.kilobytes(), or 512.bytes() on integer (usize) literals.

§Choosing stream settings

Each stdout/stderr stream is configured along five axes: backend, delivery, replay, collection, and buffering.

Backend: who consumes the stream?

  • broadcast(): Allows for multiple concurrent consumers, e.g. logging plus a line waiter.
  • single_subscriber(): Exactly one consumer is allowed to be active at a time. A second consumer started while another is still running fails immediately with StreamConsumerError::ActiveConsumer. Reach for this when you only ever read the stream from one place at a time. It turns accidental fanout into a loud error at the point of mistake. Once a consumer is dropped again, a new one can be registered. Just never two at any given time. This implementation may be slightly more performant.
  • discard(): No consumption at all. The matching child stdio slot is set to Stdio::null(), so the OS drops the bytes; no pipe is allocated and no reader task runs in the parent. The remaining axes (delivery, replay, buffering) are skipped because they have nothing to act on. Reach for this when only the exit status matters; pair it with a consumed stream on the other slot if you still want one stream’s output (.stdout(|s| s.discard()).stderr(|s| s.broadcast()...) is fine).

Delivery: what happens when a consumer can’t keep up?

  • reliable_for_active_subscribers(): When an active consumer’s buffer fills, the child is paused (its next write blocks) instead of dropping data. The guarantee is scoped to consumers that are already attached: a consumer that arrives after the child has produced output gets nothing unless you also configure a replay policy. Use when completeness of the stream / definitely receiving all output matters more than keeping the child unblocked.
  • best_effort_delivery(): Slow consumers may observe gaps. Line consumers drop the in-progress partial line and resync at the next newline. Use when latency matters more than completeness. This never blocks the child.

Replay: what does a consumer attached after spawn see?

  • .no_replay(): Live output only. A consumer that attaches after the child has already printed something will not see it.
  • .replay_last_bytes(...) / .replay_last_chunks(...) / .replay_all(): The library retains output so a late consumer can start from history. The first two cap retention. .replay_all() is unbounded and should only be used for trusted output volume.

The typical lifecycle for a long-running service is: spawn with a replay policy, attach any consumers you want, then call seal_output_replay() to release the retained history. The seal applies to future consumers: They start at live output. Consumers that were already attached when the seal happens are unaffected, and whatever the delivery policy guaranteed them is still there to read. The per-stream variants seal_stdout_replay() and seal_stderr_replay() cover the case where one stream’s startup phase ends before the other’s.

Collection: how much memory does the library hold?

Use bounded LineCollectionOptions / RawCollectionOptions for untrusted output. Reserve TrustedUnbounded for processes whose output volume is known and controlled. The bounded variants take a CollectionOverflowBehavior that decides what to keep when the cap is reached: DropAdditionalData (default) keeps the first-retained output and discards anything that arrives after the cap is hit, while DropOldestData keeps the newest output by evicting older retained data, which gives you a ring-buffer-shaped tail.

Buffering: how big a read, how deep a queue?

Every chain ends with two buffering knobs:

  • .read_chunk_size(...) is the size of each read() against the child’s pipe. Smaller values let a consumer see partial results from large bursts sooner and cap memory. Larger values reduce per-burst syscall and per-chunk overhead.
  • .max_buffered_chunks(...) is the depth of the in-process queue between the reader task and the consumer. Larger values absorb more bursty output before backpressure or dropping kicks in (depending on the delivery policy). Smaller values cap memory.

Pass DEFAULT_READ_CHUNK_SIZE and DEFAULT_MAX_BUFFERED_CHUNKS unless you have measured a reason to tune them. The defaults (16 KB and 128 chunks) are tuned for general-purpose streaming. Both knobs must be named at the spawn site: the spelled-out values keep the choice visible in code review and stop accidental mismatches between sites that all silently rely on a default.

The async line-aware consumers (inspect_lines_async, collect_lines_async, collect_lines_into_write, and friends that take an AsyncLineSink) materialize every parsed line as an owned String before invoking the per-line callback. The synchronous line consumers can pass the line as Cow::Borrowed straight out of the chunk on the fast path, so they avoid the allocation. Prefer the synchronous flavor when per-line work is non-blocking; reach for the async flavor only when the per-line callback genuinely needs to .await.

§Consumers

tokio-process-tools provides the following consumers out of the box:

  • inspect_lines / inspect_chunks (and their _async variants) run a callback per item and discard the data. Reach for these when only the side effect matters: forward to tracing, count occurrences, react to a milestone.
  • collect_lines / collect_chunks (and _async variants) retain output in memory for later inspection. Reach for these when a test or caller wants to assert on what the child wrote after it has finished and wait_for_completion_with_output doesn’t fit.
  • collect_lines_into_write / collect_chunks_into_write (plus _mapped variants) forward output to an async writer (a file, a TCP sink, anything AsyncWrite). WriteCollectionOptions controls writer-failure handling and LineWriteMode controls whether parsed lines get the stripped \n reattached. See Stream output to a sink for an end-to-end example.

See the Custom consumer / stream visitor example for guidance on how to build your own consumers.

§Consumer lifecycle

Each Consumer<S> handle owns the spawned task. Dropping the handle drops the task, which cancels the consumer immediately. Bind it to a variable, e.g. let _consumer = process.stdout().inspect_line(...), to silence the unused-result warning (if you don’t intend to interact with the handle). The #[must_use] annotations on every consumer-creator is the compiler’s reminder of this contract. If your process is long-lived, make sure to store consumer handles somewhere, so that they are not dropped.

A Consumer exposes three ways to finish. The choice is a trade-off between recovering state (the writer, the collected output) and not hanging:

  • wait() lets the consumer drain naturally. It returns when the stream closes (the child exited and any retained replay has been read out) or the visitor returned Next::Break. Use this when there is a natural endpoint, and you want whatever the consumer produced; this is the path that gives you the writer back from a collect_*_into_write consumer.
  • cancel(timeout) is the bounded cooperative-then-forceful shutdown. The consumer task is signaled to stop at the next safe point. If it observes the cancellation and exits before timeout elapses, you get ConsumerCancelOutcome::Cancelled(sink) with the sink preserved. If timeout elapses first, the task is aborted, and you get ConsumerCancelOutcome::Aborted with the sink dropped. Pick the timeout based on how long the consumer might legitimately need to flush.
  • abort() is unconditional forceful shutdown. The task’s in-flight future is dropped, which means sinks and writers are not returned (you cannot recover the file handle, the partially-collected Vec, etc.). Reach for this when not getting stuck matters more than recovering state, and you do not want to pay any timeout at all.

§Examples

Patterns past the basic spawn-and-wait shown in the Quickstart.

§Wait and collect output

Like the Quickstart spawn, but the stdout/stderr that appeared during the run is captured and returned. The collector here is attached inside the wait helper, after the child has already started producing output, which is why the stream is configured with replay_last_bytes(...). Without that policy, anything the child wrote before the helper attached the collector would be lost (see Core model). The wait timeout covers both process exit and the final drain of stdout/stderr.

use std::time::Duration;
use tokio::process::Command;
use tokio_process_tools::*;

#[tokio::main]
async fn main() {
    let mut process = Process::new(Command::new("ls"))
        .name(AutoName::program_only())
        .stdout_and_stderr(|stream| {
            stream
                .single_subscriber()
                .best_effort_delivery()
                .replay_last_bytes(1.megabytes())
                .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
                .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
        })
        .spawn()
        .expect("failed to spawn command");

    let line_collection_options = LineCollectionOptions::Bounded {
        max_bytes: 1.megabytes(),
        max_lines: 1024,
        overflow_behavior: CollectionOverflowBehavior::DropAdditionalData,
    };
    
    let ProcessOutput {
        status,
        stdout,
        stderr,
    } = process
        .wait_for_completion_with_output(
            Duration::from_secs(30),
            LineOutputOptions {
                line_parsing_options: LineParsingOptions::default(),
                stdout_collection_options: line_collection_options,
                stderr_collection_options: line_collection_options,
            },
        )
        .await
        .unwrap()
        .expect_completed("process should complete");

    println!("exit status: {status:?}");
    println!("stdout lines: {:?}", stdout.lines());
    println!("stderr lines: {:?}", stderr.lines());
}

Use wait_for_completion_with_raw_output() when the child’s output is not UTF-8 line-oriented: binary blobs (image conversion, archive tools), framed protocols, or anything where line parsing would corrupt or lose bytes. The shape of the call is the same; substitute RawOutputOptions for LineOutputOptions, RawCollectionOptions for LineCollectionOptions, and read the result as CollectedBytes { bytes, truncated } on each stream. No LineParsingOptions is involved. RawCollectionOptions offers the same Bounded { max_bytes, overflow_behavior } and TrustedUnbounded variants as the line equivalent.

§Wait for readiness from output

The classic “start a service in an integration test and wait until it logs that it is ready” pattern. wait_for_line() is itself a stream consumer with a required timeout, so under single_subscriber() it competes with anything else reading the stream. Reach for broadcast() whenever readiness detection has to coexist with a logger. Pair it with replay retention so a waiter attached after spawn can still see startup lines, then seal_output_replay() to free that history once the service is up.

use std::time::Duration;
use tokio::process::Command;
use tokio_process_tools::*;

#[tokio::main]
async fn main() {
    let mut process = Process::new(Command::new("my-web-server"))
        .name("api")
        .stdout_and_stderr(|stream| {
            stream
                .broadcast()
                .reliable_for_active_subscribers()
                .replay_last_bytes(1.megabytes())
                .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
                .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
        })
        .spawn()
        .expect("failed to spawn server");

    let stdout = process.stdout();

    let _logs = stdout.inspect_lines(
        |line| {
            eprintln!("{line}");
            Next::Continue
        },
        LineParsingOptions::default(),
    );

    match stdout
        .wait_for_line(
            Duration::from_secs(30),
            |line| line.contains("Server listening on"),
            LineParsingOptions::default(),
        )
        .await
    {
        Ok(WaitForLineResult::Matched) => {}
        Ok(WaitForLineResult::StreamClosed) => panic!("server exited before becoming ready"),
        Ok(WaitForLineResult::Timeout) => panic!("server did not become ready in time"),
        Err(err) => panic!("failed to read stdout: {err}"),
    }

    process.seal_output_replay();

    let _ = process
        .terminate(Duration::from_secs(3), Duration::from_secs(5))
        .await;
}

§Interactive stdin

Children always have stdin piped (there is no opt-out), but the library only flushes/closes the pipe when you ask. Calling stdin().close() lets a child observe EOF before any wait helper runs (some programs only finish on EOF, not on idle). Terminal waits and kill() close any still-open stdin automatically, so the manual close is only needed when the child must see EOF before the wait.

process.stdin() returns a &mut Stdin, an enum with Open(ChildStdin) and Closed variants. Stdin::as_mut() projects that into Option<&mut ChildStdin>, returning None once the slot has transitioned to Closed (either explicitly via stdin().close() or implicitly because a wait helper or kill() has already cleaned it up). The example below pattern-matches with if let Some(stdin) = process.stdin().as_mut() for that reason. The None branch has no other failure mode in the stdin-is-always-piped contract.

use std::time::Duration;
use tokio::io::AsyncWriteExt;
use tokio::process::Command;
use tokio_process_tools::*;

#[tokio::main]
async fn main() {
    let line_collection = LineCollectionOptions::Bounded {
        max_bytes: 1.megabytes(),
        max_lines: 1024,
        overflow_behavior: CollectionOverflowBehavior::DropAdditionalData,
    };

    let mut process = Process::new(Command::new("cat"))
        .name(AutoName::program_only())
        .stdout_and_stderr(|stream| {
            stream
                .single_subscriber()
                .best_effort_delivery()
                .replay_last_bytes(1.megabytes())
                .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
                .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
        })
        .spawn()
        .expect("failed to spawn command");

    if let Some(stdin) = process.stdin().as_mut() {
        stdin.write_all(b"hello\n").await.unwrap();
        stdin.write_all(b"world\n").await.unwrap();
        stdin.flush().await.unwrap();
    }

    process.stdin().close();

    let output = process
        .wait_for_completion_with_output(
            Duration::from_secs(30),
            LineOutputOptions {
                line_parsing_options: LineParsingOptions::default(),
                stdout_collection_options: line_collection,
                stderr_collection_options: line_collection,
            },
        )
        .await
        .unwrap()
        .expect_completed("process should complete");

    println!("stdout lines: {:?}", output.stdout.lines());
}

§Stream output to a sink

When the child will produce more output than is reasonable to hold in memory (build tools, long-running servers in CI logs, or anything you want on disk rather than in Vec<String>), point a collector at an AsyncWrite sink instead. The example below tees stdout into a file. Parsed lines are written one per call, with LineWriteMode::AppendLf re-attaching the trailing newline that line parsing strips. WriteCollectionOptions::log_and_continue() keeps collection running if the file write ever fails (disk full, broken pipe to a piped writer, etc.); use fail_fast() instead when a sink failure should stop the collection.

The collector is yours to drive. After the child exits the stream closes, the visitor drains, and wait() returns the file handle so you can flush or close it deterministically.

This example needs the fs Tokio feature in your own crate, in addition to the features listed in the Quickstart.

use std::time::Duration;
use tokio::fs::File;
use tokio::process::Command;
use tokio_process_tools::*;

#[tokio::main]
async fn main() {
    let mut process = Process::new(Command::new("my-noisy-tool"))
        .name(AutoName::program_only())
        .stdout_and_stderr(|stream| {
            stream
                .broadcast()
                .reliable_for_active_subscribers()
                .no_replay()
                .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
                .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
        })
        .spawn()
        .expect("failed to spawn command");

    let log_file = File::create("/tmp/output.log")
        .await
        .expect("failed to open log file");

    let log_collector = process.stdout().collect_lines_into_write(
        log_file,
        LineParsingOptions::default(),
        LineWriteMode::AppendLf,
        WriteCollectionOptions::log_and_continue(),
    );

    let _status = process
        .wait_for_completion(Duration::from_secs(60))
        .await
        .unwrap()
        .expect_completed("process should complete");

    // Stream is closed now that the child has exited; the collector drains. The outer
    // `Result` reflects consumer-infrastructure outcomes (task join, stream read), the
    // inner one reflects whether the writer accepted every byte; with
    // `log_and_continue()` write failures are logged so the inner result is `Ok` here.
    let _log_file = log_collector.wait().await.unwrap().unwrap();
}

§Timeout and Cleanup

For processes that may hang, wait_for_completion_or_terminate() rolls the “interrupt → terminate → kill” escalation into the wait so you don’t have to sequence cleanup yourself. The three durations correspond to: how long to wait for a clean exit, how long to give the child after the interrupt signal, and how long to give it after the terminate signal before falling back to kill().

use std::time::Duration;
use tokio::process::Command;
use tokio_process_tools::*;

#[tokio::main]
async fn main() {
    let mut process = Process::new(Command::new("potentially-hanging-process"))
        .name(AutoName::program_only())
        .stdout_and_stderr(|stream| {
            stream
                .broadcast()
                .best_effort_delivery()
                .no_replay()
                .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
                .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
        })
        .spawn()
        .expect("failed to spawn command");

    match process
        .wait_for_completion_or_terminate(WaitForCompletionOrTerminateOptions {
            wait_timeout: Duration::from_secs(30),
            interrupt_timeout: Duration::from_secs(3),
            terminate_timeout: Duration::from_secs(5),
        })
        .await
    {
        Ok(WaitForCompletionOrTerminateResult::Completed(status)) => {
            println!("completed with status: {status:?}");
        }
        Ok(WaitForCompletionOrTerminateResult::TerminatedAfterTimeout {
            result,
            timeout,
        }) => {
            println!("terminated after waiting {timeout:?}; status: {result:?}");
        }
        Err(err) => eprintln!("wait or cleanup failed: {err}"),
    }
}

§Custom consumer / stream visitor

When the built-in inspect_* and collect_* consumers don’t fit, implement StreamVisitor (sync) or AsyncStreamVisitor (async) directly and attach it with consume_with / consume_with_async. The cases that pay for a custom visitor are: maintaining state across chunks, reacting explicitly to gap notifications, or producing an output type the built-ins don’t expose. If all you need is a side effect per chunk or per line, reach for inspect_chunks / inspect_lines; the closure form is simpler.

Pick StreamVisitor when on_chunk can return without .await-ing (counters, in-memory state machines, channel try_send, simple synchronous I/O). Pick AsyncStreamVisitor when chunk handling needs to await (network sends, async writers, channel send with backpressure). Both traits require Send + 'static, and both return their final value via into_output(self) at the end of the consumer task’s life.

The example below tracks how many bytes and chunks the stream produced, plus how many gap events the backend reported. on_gap is invoked when one or more chunks were dropped between the previous and next observed chunk (only possible under best_effort_delivery() overflow), and visitors that splice bytes across chunks should treat it as a reset signal.

use std::time::Duration;
use tokio::process::Command;
use tokio_process_tools::*;

#[derive(Debug, Default)]
struct ChunkStats {
    bytes: usize,
    chunks: usize,
    gaps: usize,
}

impl StreamVisitor for ChunkStats {
    type Output = ChunkStats;

    fn on_chunk(&mut self, chunk: Chunk) -> Next {
        self.bytes += chunk.as_ref().len();
        self.chunks += 1;
        Next::Continue
    }

    fn on_gap(&mut self) {
        self.gaps += 1;
    }

    fn into_output(self) -> ChunkStats {
        self
    }
}

#[tokio::main]
async fn main() {
    let mut process = Process::new(Command::new("ls"))
        .name(AutoName::program_only())
        .stdout_and_stderr(|stream| {
            stream
                .single_subscriber()
                .best_effort_delivery()
                .replay_last_bytes(1.megabytes())
                .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
                .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
        })
        .spawn()
        .expect("failed to spawn command");

    let counter = process
        .stdout()
        .consume_with(ChunkStats::default())
        .expect("no other consumer is attached yet");

    let _status = process
        .wait_for_completion(Duration::from_secs(30))
        .await
        .unwrap()
        .expect_completed("process should complete");

    let stats = counter.wait().await.unwrap();
    println!(
        "stdout: {} bytes across {} chunks ({} gaps)",
        stats.bytes, stats.chunks, stats.gaps,
    );
}

A few lifecycle quirks worth knowing. on_gap() is synchronous in both traits because gap notifications carry no data to await on. on_eof() is skipped when on_chunk returned Next::Break, on the assumption that an early-break visitor already has its result; cancellation and abort skip on_eof for the same reason. Whichever exit path was taken, into_output always runs, so Consumer::wait() and cancel() always have a value to yield.

§Further APIs

The Quickstart and Examples cover the common path. The rest of the surface is small but purposeful. Here is when you would reach for each piece.

§Wait helper variants

wait_for_completion_with_output() and friends come in four combinations: line vs. raw bytes, and plain wait vs. wait-or-terminate. Pick the variant that matches what you need:

  • Line vs. raw. Line collection parses on the fly and gives you Vec<String>-style output; raw collection gives you the bytes the child actually wrote, which matters when output isn’t UTF-8 line-oriented.
  • Plain vs. _or_terminate. The _or_terminate variants embed the same interrupt → terminate → kill escalation shown in Timeout and Cleanup, so a hung child still gets cleaned up before the helper returns.

Each plain wait helper returns a WaitForCompletionResult (Completed or Timeout); the _or_terminate variants return WaitForCompletionOrTerminateResult (Completed or TerminatedAfterTimeout). The examples above use .expect_completed(msg) to panic on the non-completed branch when a timeout would be a hard failure; reach for .into_completed() instead when the caller should handle both outcomes itself.

When you need output collection and hung-child cleanup together, reach for the combined wait_for_completion_with_output_or_terminate() (or its _with_raw_output_or_terminate() sibling). It takes the same LineOutputOptions / RawOutputOptions you would pass to _with_output, plus the three durations you would pass to _or_terminate, with no extra ceremony.

For trusted, bounded-by-construction output, both LineCollectionOptions and RawCollectionOptions have a TrustedUnbounded variant that opts out of memory caps.

§Line parsing

LineParsingOptions controls how chunks are split into lines and what happens at the limits. wait_for_line() takes the same LineParsingOptions so its matcher sees the same lines a collector would. Three fields:

  • max_line_length (default 16 KB) caps a single line. It must be greater than zero; pass NumBytes::MAX explicitly to opt into unbounded line parsing on a trusted source. LineOverflowBehavior decides what happens past the cap: DropAdditionalData discards bytes until the next newline (default), and EmitAdditionalAsNewLines splits the long line into successive emitted lines so no data is lost.
  • buffer_compaction_threshold (default None) is a steady-state memory knob. The parser’s two internal buffers retain capacity for whichever was the largest line they have ever held; setting Some(n) forces buffers above n to be released between lines. Useful with mostly-small lines and occasional outliers (especially under NumBytes::MAX), counterproductive when typical lines are already near the threshold.

When chunks drop under best_effort_delivery(), line parsers conservatively discard the in-progress partial line and resynchronize at the next newline rather than splicing across the gap. Callers should not assume that a missing newline after a gap means “the next line continues the previous one”; treat each post-gap line as fresh.

§Manual process control

When the wait-and-cleanup helpers do not fit, drive shutdown by hand: send_interrupt_signal(), send_terminate_signal(), terminate() (escalation up to kill), and kill() (immediate). Each returns a typed TerminationError describing which step failed. The signal-send methods are idempotent against a child that has already exited: they reap the status and return Ok(()) without sending a signal at a possibly-recycled PID. seal_stdout_replay(), seal_stderr_replay(), and seal_output_replay() end replay retention on one stream or both.

§Process management

§Process naming

A process name is required before stdout/stderr can be configured. The name appears in error messages and tracing spans, and the library refuses to spawn anonymous processes because anonymous failures are hard to debug.

For production workflows, prefer a stable explicit label ("api-worker", format!("agent-{id}")) so logs and metrics group cleanly. The AutoName variants are convenient for ad-hoc and test code, but the variants that include arguments, environment, or current directory will surface those values in error messages, so only use them when those values are safe to log.

Canonical naming forms:

  • Explicit label: .name("api-worker") or .name(format!("agent-{id}")). The recommended default.
  • Safe automatic: .name(AutoName::program_only()). Uses just the program name; no arguments, environment, or path are leaked into errors.
  • Detailed automatic: .name(AutoName::program_with_args()), .name(AutoName::program_with_env_and_args()), .name(AutoName::full()) (cwd + env + program + args), or .name(AutoName::Debug) for the full Command debug representation. Useful in tests; check the redaction story before using in production.
  • Custom automatic: .name(AutoNameSettings::builder()...build()) to opt into a specific subset of fields.
use std::time::Duration;
use tokio::process::Command;
use tokio_process_tools::*;

#[tokio::main]
async fn main() {
    let id = 42;
    let mut process = Process::new(Command::new("true"))
        .name(format!("agent-{id}"))
        .stdout_and_stderr(|stream| {
            stream
                .single_subscriber()
                .best_effort_delivery()
                .no_replay()
                .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
                .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
        })
        .spawn()
        .unwrap();

    process
        .wait_for_completion(Duration::from_secs(30))
        .await
        .unwrap()
        .expect_completed("process should complete");
}

§Automatic termination on drop

A live ProcessHandle that is dropped without a successful terminal wait, terminate(), or kill() runs best-effort cleanup and then panics. The panic is deliberate: a silent leak of a child process is much harder to detect than a loud panic in tests, and most “I forgot to wait” bugs would otherwise survive into production.

For long-lived processes that follow a service’s own lifecycle, opting in to .terminate_on_drop(...) swaps the panic for an async termination attempt during drop:

use std::time::Duration;
use tokio::process::Command;
use tokio_process_tools::*;

#[tokio::main]
async fn main() {
    let _process = Process::new(Command::new("some-command"))
        .name(AutoName::program_only())
        .stdout_and_stderr(|stream| {
            stream
                .broadcast()
                .best_effort_delivery()
                .no_replay()
                .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
                .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
        })
        .spawn()
        .unwrap()
        .terminate_on_drop(Duration::from_secs(3), Duration::from_secs(5));
}

terminate_on_drop requires a multithreaded Tokio runtime, because drop runs the termination attempt on Tokio’s blocking pool. If the attempt itself fails, the inner handle still falls back to the panic path so the failure is not silent. Under #[tokio::test], this means opting in to the multi-thread flavor:

#[tokio::test(flavor = "multi_thread")]
async fn test() {
    // ...
}

When the child genuinely needs to outlive the handle (e.g. a daemon you spawned intentionally), call must_not_be_terminated(). That suppresses both the kill and the panic on drop. The library still drops its owned stdio, however, so the child loses access to its piped stdin and the parent loses access to its stdout/stderr at the same moment.

To keep the child and its Stdin/Stdout/Stderr together, use ProcessHandle::into_inner() to take ownership of all four.

§Platform support

The termination escalation is per-platform but follows the same idea: try the gentle signal first (so the child can flush, close sockets, persist state), then a stronger signal, then unconditional kill. Pick the timeouts you pass to terminate() and the _or_terminate helpers based on how long your child legitimately needs at each step. Each step targets the child’s process group rather than its PID alone, so grandchildren are signaled with the leader; see Subprocess trees for the spawn-time setup that makes this work.

  • Linux/macOS: SIGINTSIGTERMSIGKILL, all delivered to the child’s process group via killpg.
  • Windows: CTRL_BREAK_EVENT to the console process group → TerminateProcess on the leader as the kill fallback.

The Windows path uses CTRL_BREAK_EVENT rather than CTRL_C_EVENT because Windows does not allow targeting CTRL_C_EVENT at a nonzero child process group. A child that only handles Ctrl+C will not respond to either signal, and needs its own graceful shutdown channel (stdin, IPC, or a command protocol).

Signal sends (send_interrupt_signal, send_terminate_signal, and the equivalent steps inside terminate()) check for an already-exited child first and return Ok(()) without sending a signal in that case, so calling them after the child has died is harmless rather than racy.

§Subprocess trees

Every spawned child is set up as the leader of its own process group, so termination signals reach the whole subtree rather than only the child the library spawned. If your child fork-execs further processes (an npm start that launches node, a shell wrapper that launches the real binary, a build tool that fans out workers), the grandchildren inherit the group and are signaled along with the leader by send_interrupt_signal(), send_terminate_signal(), terminate(), and kill().

  • Linux/macOS: the child is spawned with process_group(0), making its PID equal to its PGID. SIGINT, SIGTERM, and SIGKILL are delivered with killpg, which targets the whole group.
  • Windows: the child is spawned with CREATE_NEW_PROCESS_GROUP. CTRL_BREAK_EVENT is delivered to that console process group via GenerateConsoleCtrlEvent. The forceful-kill fallback (TerminateProcess) still targets the leader, so a graceful interrupt window is the right place to let cooperative children unwind.

A grandchild that explicitly detaches itself (setsid/setpgid to leave the group, double-fork into init, or its own job-object on Windows) is by definition outside the group and will not be signaled. That is correct: such children have asked to outlive their parent. If you do not own the leaf, and it leaks descendants of its own, run it under a small reaper (tini is the usual choice) so the descendants have a designated owner.

§MSRV

MSRV is 1.89.0.

§Contributing

Contributions are welcome!

  1. Create a feature branch.
  2. Run just verify on your changes.
  3. Don’t forget to add a CHANGELOG entry under the # [Unreleased] section.
  4. Open a pull request describing the behavior change and any relevant README updates.

§License

Licensed under either of:

at your option.

Structs§

AutoNameSettings
Controls in detail which parts of the command are automatically captured as the process name.
BestEffortDelivery
Best-effort stream delivery marker.
BroadcastOutputStream
The output stream from a process using a multi-consumer broadcast backend.
Chunk
A “chunk” is an arbitrarily sized byte slice read from the underlying stream. The slices’ length is at max of the previously configured maximum chunk_size.
CollectLineSink
LineSink holding the user closure and a sink; on_line calls the closure with the line and a &mut borrow of the sink. Compose with LineAdapter to drive collect_lines, or to build your own custom collect-lines consumer outside the built-in factory methods.
CollectLineSinkAsync
AsyncLineSink holding the user collector and a sink. Compose with LineAdapter (its AsyncStreamVisitor impl is selected automatically when the inner sink is an AsyncLineSink) to drive collect_lines_async.
CollectedBytes
Raw bytes collected from an output stream.
CollectedLines
Parsed lines collected from an output stream.
Consumer
A handle for a tokio task that consumes a stream by driving a visitor over its events.
DiscardedOutputStream
Marker stream for a stdio slot configured with std::process::Stdio::null().
DiscardedStreamConfig
Configuration produced by ProcessStreamBuilder::discard. The matching child stdio slot is set to Stdio::null(), so the OS discards the bytes; no pipe is allocated and no reader task runs in the parent.
InspectLineSink
LineSink wrapping a per-line closure. Compose with LineAdapter to drive inspect_lines, or to build your own custom inspect-lines consumer outside the built-in factory methods.
InspectLineSinkAsync
AsyncLineSink wrapping a per-line async closure. Compose with LineAdapter (its AsyncStreamVisitor impl is selected automatically when the inner sink is an AsyncLineSink) to drive inspect_lines_async. The PhantomData<fn() -> Fut> carries the future’s type onto the struct so callers never name Fut explicitly.
LineAdapter
Adapter that drives LineParser over chunk events and dispatches every emitted line to the inner LineSink (sync) or AsyncLineSink (async).
LineOutputOptions
Options for line output collection from stdout and stderr.
LineParser
Stateful parser for turning arbitrary byte chunks into lines.
LineParsingOptions
Configuration options for parsing lines from a stream.
NoReplay
Marker for streams without replay support.
NumBytes
A wrapper type representing a number of bytes.
Process
Typestate builder for configuring and spawning a process.
ProcessHandle
A handle to a spawned process with captured stdout/stderr streams.
ProcessOutput
Full output of a process that terminated.
ProcessStreamBuilder
Builder for selecting the output stream backend for one process stream.
RawOutputOptions
Options for raw byte output collection from stdout and stderr.
ReliableDelivery
Reliable active-subscriber stream delivery marker.
ReplayEnabled
Marker for streams with replay support.
SingleSubscriberOutputStream
The output stream from a process. Either representing stdout or stderr.
SinkWriteError
Details about a failed async write into a collector sink.
StreamConfig
Shared output stream configuration for all stream backends.
StreamConfigBuilder
Initial builder stage that requires selecting delivery behavior.
StreamReadError
Error emitted when an output stream cannot be read to completion.
TerminateOnDrop
A wrapper that automatically terminates a process when dropped.
TerminationAttemptError
A failed operation recorded while attempting to terminate a process.
WaitForCompletionOrTerminateOptions
Options for waiting until a process exits, terminating it if waiting fails.
WaitForLineSink
LineSink that breaks the moment a predicate accepts a line and remembers whether it has matched yet. Compose with LineAdapter to drive wait_for_line, or to build your own custom predicate-driven consumer outside the built-in factory methods.
WriteCollectionOptions
Options for forwarding collected stream output into an async writer.
WriteLineSink
AsyncLineSink that maps each parsed line through mapper, writes the result via writer, and routes failures through error_handler. Compose with LineAdapter (its AsyncStreamVisitor impl is selected automatically when the inner sink is an AsyncLineSink) to drive collect_lines_into_write and friends, or to build your own custom write-lines consumer outside the built-in factory methods.

Enums§

AutoName
Controls how the process name is automatically generated when not explicitly provided.
CollectionOverflowBehavior
Controls which output is retained once a bounded in-memory collection reaches its limit.
ConsumerCancelOutcome
The result of Consumer::cancel.
ConsumerError
Errors that the Consumer infrastructure itself can raise while driving its stream.
DeliveryGuarantee
Runtime delivery behavior used by typed stream modes.
LineCollectionOptions
Options for collecting parsed output lines into memory.
LineOverflowBehavior
What should happen when a line is too long?
LineWriteMode
Controls how line-based write helpers delimit successive lines.
Next
Control flag to indicate whether processing should continue or break.
ProcessName
Specifies how a process should be named.
RawCollectionOptions
Options for collecting raw output bytes into memory.
ReplayRetention
Replay history retained by replay-enabled streams.
RunningState
Represents the running state of a process.
SinkWriteErrorAction
Action to take after an async writer sink rejects collected output.
SinkWriteOperation
The write operation that failed while forwarding collected output into an async writer.
SpawnError
Errors that can occur when spawning a process.
Stdin
Represents the stdin stream of a child process.
StreamConsumerError
Errors that can occur when creating a stream consumer.
StreamEvent
Event emitted by an output stream backend.
TerminationAttemptOperation
Termination operation that failed.
TerminationAttemptPhase
Termination phase where an attempt error was recorded.
TerminationError
Errors that can occur when terminating a process.
WaitError
Errors that can occur when waiting for process operations.
WaitForCompletionOrTerminateResult
Result of waiting for a process to complete, terminating it if the wait times out.
WaitForCompletionResult
Result of waiting for a process to complete within an explicit timeout.
WaitForLineResult
Result of waiting for an output line matching a predicate.
WaitOrTerminateError
Errors that can occur when waiting for a process with automatic termination on failure.
WaitWithOutputError
Errors that can occur when waiting for a process while collecting its output, with or without automatic termination on timeout.

Constants§

DEFAULT_MAX_BUFFERED_CHUNKS
Default maximum buffered chunks for stdout and stderr streams. 128 slots.
DEFAULT_MAX_LINE_LENGTH
Default maximum line length used by LineParsingOptions::default. 16 kilobytes.
DEFAULT_READ_CHUNK_SIZE
Default chunk size read from the source stream. 16 kilobytes.

Traits§

AsyncChunkCollector
An async collector for raw output chunks.
AsyncLineCollector
An async collector for parsed output lines.
AsyncLineSink
Async per-line action selected by the AsyncStreamVisitor impl of LineAdapter.
AsyncStreamVisitor
An asynchronous visitor that observes stream events and produces a final value.
Delivery
Marker trait implemented by supported stream delivery marker types.
LineSink
Per-line action selected by the StreamVisitor impl of LineAdapter.
NumBytesExt
Extension trait providing convenience constructors for NumBytes.
OutputStream
We support the following implementations:
ProcessStreamConfig
Marker trait for process stream builder configurations.
Replay
Marker trait implemented by supported stream replay marker types.
Sink
A trait for types that can act as sinks for collected stream data.
SinkWriteErrorHandler
Handles async writer sink failures observed by writer collectors.
StreamVisitor
A synchronous visitor that observes stream events and produces a final value.
Subscription
Stream event subscription used by built-in consumers.
TrySubscribable
Output stream backend that can reject consumer subscriptions.