tokio-process-tools 0.6.0

Interact with processes spawned by tokio.
Documentation

tokio-process-tools

A powerful library for spawning and managing processes in the Tokio runtime with advanced output handling capabilities.

When working with child processes in async Rust, you often need to:

  • Monitor output in real-time without blocking
  • Wait for specific log messages before proceeding
  • Gracefully terminate processes with proper signal handling
  • Collect output for later analysis
  • Handle multiple concurrent consumers of the same stream
  • Prevent spawned processes from leaking, not being terminated properly

tokio-process-tools tries to make all of this simple and ergonomic.

Features

  • Real-time Output Inspection - Monitor stdout/stderr as they arrive, with sync and async callbacks
  • 🔍 Pattern Matching - Wait for specific output before continuing execution
  • 🎯 Flexible Collection - Gather output into vectors, files, or custom sinks
  • 🔄 Multiple Consumers - Support for both single and broadcast (multi-consumer) stream consumption
  • Graceful Termination - Automatic signal escalation (SIGINT → SIGTERM → SIGKILL)
  • 🛡️ Collection Safety - Fine-grained control over buffers used when collecting stdout/stderr streams
  • 🛡️ Resource Safety - Panic-on-drop guards ensure processes are properly cleaned up
  • ⏱️ Timeout Support - Built-in timeout handling for all operations
  • 🌊 Backpressure Control - Configurable behavior when consumers can't keep up

Quick Start

Add to your Cargo.toml:

[dependencies]
tokio-process-tools = "0.6"
tokio = { version = "1", features = ["process", "sync", "io-util", "rt-multi-thread", "time"] }

Examples

Basic: Spawn and Collect Output

use tokio_process_tools::single_subscriber::SingleSubscriberOutputStream;
use tokio_process_tools::*;

#[tokio::main]
async fn main() {
    let mut cmd = tokio::process::Command::new("ls");
    let mut process = ProcessHandle::<SingleSubscriberOutputStream>::spawn("ls", cmd)
        .expect("Failed to spawn command");

    // Collect all output
    let Output { status, stdout, stderr } = process
        .wait_for_completion_with_output(None, LineParsingOptions::default())
        .await
        .unwrap();

    println!("Exit status: {:?}", status);
    println!("Output: {:?}", stdout);
}

Stream Types: Which to Choose?

As shown by the first example, the ProcessHandle is generic over how the stdout/stderr streams are processed and made available for consumption. There are two implementations to choose from:

SingleSubscriberOutputStream

  • ✅ More efficient (lower memory, no cloning)
  • ✅ Single consumer only
  • ✅ Configurable backpressure handling
  • 💡 Use when: You only need one way to consume output (e.g., just collecting OR just monitoring)

BroadcastOutputStream

  • ✅ Multiple concurrent consumers
  • ✅ Great for logging + collecting + monitoring simultaneously
  • ❌ Slightly higher memory usage
  • 💡 Use when: You need multiple operations on the same stream (e.g., log to console AND save to file)

Output Handling

Monitor Output in Real-Time

use std::time::Duration;
use tokio_process_tools::single_subscriber::SingleSubscriberOutputStream;
use tokio_process_tools::*;

#[tokio::main]
async fn main() {
    let mut cmd = tokio::process::Command::new("tail");
    cmd.arg("-f").arg("/var/log/app.log");

    let mut process = ProcessHandle::<SingleSubscriberOutputStream>::spawn("tail", cmd).unwrap();

    // Inspect output in real-time
    let _stdout_monitor = process.stdout_mut().inspect_lines(
        |line| {
            println!("stdout: {line}");
            Next::Continue
        },
        LineParsingOptions::default()
    );

    // Let it run for a while
    tokio::time::sleep(Duration::from_secs(10)).await;

    // Gracefully terminate
    process.terminate(
        Duration::from_secs(3),  // SIGINT timeout
        Duration::from_secs(5),  // SIGTERM timeout
    ).await.unwrap();
}

Wait for Specific Output

Perfect for integration tests or ensuring services are ready:

use std::time::Duration;
use tokio_process_tools::single_subscriber::SingleSubscriberOutputStream;
use tokio_process_tools::*;

#[tokio::main]
async fn main() {
    let mut cmd = tokio::process::Command::new("my-web-server");
    let mut process = ProcessHandle::<SingleSubscriberOutputStream>::spawn("server", cmd).unwrap();

    // Wait for the server to be ready
    match process.stdout_mut().wait_for_line_with_timeout(
        |line| line.contains("Server listening on"),
        LineParsingOptions::default(),
        Duration::from_secs(30),
    ).await {
        Ok(_) => println!("Server is ready!"),
        Err(_) => panic!("Server failed to start in time"),
    }

    // Now safe to make requests to the server
    // ...

    // Cleanup
    process.wait_for_completion_or_terminate(
        Duration::from_secs(5),   // Wait timeout
        Duration::from_secs(3),   // SIGINT timeout
        Duration::from_secs(5),   // SIGTERM timeout
    ).await.unwrap();
}

Working with Multiple Consumers

use tokio_process_tools::broadcast::BroadcastOutputStream;
use tokio_process_tools::*;

#[tokio::main]
async fn main() {
    let mut cmd = tokio::process::Command::new("long-running-process");
    let mut process = ProcessHandle::<BroadcastOutputStream>::spawn("process", cmd).unwrap();

    // Consumer 1: Log to console
    let _logger = process.stdout().inspect_lines(
        |line| {
            eprintln!("[LOG] {}", line);
            Next::Continue
        },
        LineParsingOptions::default()
    );

    // Consumer 2: Collect to file
    let log_file = tokio::fs::File::create("output.log").await.unwrap();
    let _file_writer = process.stdout().collect_lines_into_write(
        log_file,
        LineParsingOptions::default()
    );

    // Consumer 3: Search for errors
    let error_collector = process.stdout().collect_lines(
        Vec::new(),
        |line, vec| {
            if line.contains("ERROR") {
                vec.push(line);
            }
            Next::Continue
        },
        LineParsingOptions::default()
    );

    // Wait for completion
    process.wait_for_completion(None).await.unwrap();

    // Get collected errors
    let errors = error_collector.wait().await.unwrap();
    println!("Found {} errors", errors.len());
}

Advanced Processing

Chunk-Based Processing

For binary data or when you need raw bytes instead of lines:

use tokio_process_tools::broadcast::BroadcastOutputStream;
use tokio_process_tools::*;

#[tokio::main]
async fn main() {
    let mut cmd = tokio::process::Command::new("cat");
    cmd.arg("binary-file.dat");

    let mut process = ProcessHandle::<BroadcastOutputStream>::spawn("cat", cmd).unwrap();

    // Process raw chunks of bytes
    let chunk_collector = process.stdout().collect_chunks(
        Vec::new(),
        |chunk, buffer| {
            // Process raw bytes (e.g., binary protocol parsing)
            buffer.extend_from_slice(chunk.as_ref());
        }
    );

    process.wait_for_completion(None).await.unwrap();
    let all_bytes = chunk_collector.wait().await.unwrap();

    println!("Collected {} bytes", all_bytes.len());
}

Async Output Processing

use std::time::Duration;
use tokio_process_tools::broadcast::BroadcastOutputStream;
use tokio_process_tools::*;

#[tokio::main]
async fn main() {
    let mut cmd = tokio::process::Command::new("data-processor");
    let mut process = ProcessHandle::<BroadcastOutputStream>::spawn("processor", cmd).unwrap();

    // Process output asynchronously (e.g., send to database)
    let _processor = process.stdout().inspect_lines_async(
        async |line| {
            // Simulate async processing
            process_line_in_database(&line).await;
            Next::Continue
        },
        LineParsingOptions::default()
    );

    process.wait_for_completion(None).await.unwrap();
}

async fn process_line_in_database(line: &str) {
    // Your async logic here
    tokio::time::sleep(Duration::from_millis(10)).await;
}

Custom Collectors

use tokio::process::Command;
use tokio_process_tools::broadcast::BroadcastOutputStream;
use tokio_process_tools::*;

#[tokio::main]
async fn main() {
    let cmd = Command::new("some-command");
    let process = ProcessHandle::<BroadcastOutputStream>::spawn("cmd", cmd).unwrap();

    #[derive(Debug)]
    struct MyCollector {}

    impl MyCollector {
        fn process_line(&mut self, line: String) {
            dbg!(line);
        }
    }

    // Collect into any type implementing the Sink trait
    let custom_collector = process.stdout().collect_lines(
        MyCollector {},
        |line, custom| {
            custom.process_line(line);
            Next::Continue
        },
        LineParsingOptions::default()
    );

    let result = custom_collector.wait().await.unwrap();
}

Mapped Output

Transform output before writing into sink supporting the returned by the map closure.

use tokio::process::Command;
use tokio_process_tools::broadcast::BroadcastOutputStream;
use tokio_process_tools::*;

#[tokio::main]
async fn main() {
    let cmd = Command::new("some-command");
    let process = ProcessHandle::<BroadcastOutputStream>::spawn("cmd", cmd).unwrap();

    let log_file = tokio::fs::File::create("output.log").await.unwrap();

    let collector = process.stdout().collect_lines_into_write_mapped(
        log_file,
        |line| format!("[stdout] {line}\n"),
        LineParsingOptions::default()
    );
}

Custom Line Parsing

The LineParsingOptions type controls how data is read from stdout/stderr streams.

use tokio::process::Command;
use tokio_process_tools::broadcast::BroadcastOutputStream;
use tokio_process_tools::*;

#[tokio::main]
async fn main() {
    let mut cmd = Command::new("some-command");
    let mut process = ProcessHandle::<BroadcastOutputStream>::spawn("cmd", cmd).unwrap();
    process.stdout().wait_for_line(
        |line| line.contains("Ready"),
        LineParsingOptions {
            max_line_length: 1.megabytes(),  // Protect against memory exhaustion
            overflow_behavior: LineOverflowBehavior::DropAdditionalData,
        },
    ).await;
}

Process Management

Timeout with Automatic Termination

use std::time::Duration;
use tokio_process_tools::broadcast::BroadcastOutputStream;
use tokio_process_tools::*;

#[tokio::main]
async fn main() {
    let mut cmd = tokio::process::Command::new("potentially-hanging-process");
    let mut process = ProcessHandle::<BroadcastOutputStream>::spawn("process", cmd).unwrap();

    // Automatically terminate if it takes too long
    match process.wait_for_completion_or_terminate(
        Duration::from_secs(30),  // Wait for 30s
        Duration::from_secs(3),   // Then send SIGINT, wait 3s
        Duration::from_secs(5),   // Then send SIGTERM, wait 5s
        // If still running, sends SIGKILL
    ).await {
        Ok(status) => println!("Completed with status: {:?}", status),
        Err(e) => eprintln!("Termination failed: {}", e),
    }
}

Automatic Termination on Drop

use std::time::Duration;
use tokio::process::Command;
use tokio_process_tools::broadcast::BroadcastOutputStream;
use tokio_process_tools::*;

#[tokio::main]
async fn main() {
    let cmd = Command::new("some-command");
    let process = ProcessHandle::<BroadcastOutputStream>::spawn("cmd", cmd)
        .unwrap()
        .terminate_on_drop(Duration::from_secs(3), Duration::from_secs(5));

    // Process is automatically terminated when dropped.
    // Requires a multithreaded runtime!
}

Testing Integration

Note: If you use this libraries TerminateOnDrop under a test, ensure that a multithreaded runtime is used with:

#[tokio::test(flavor = "multi_thread")]
async fn test() {
    // ...
}

Platform Support

  • Linux/macOS: Using SIGINT, SIGTERM, SIGKILL
  • Windows: Using CTRL_C_EVENT, CTRL_BREAK_EVENT

Requirements

  • Rust 1.85.0 or later (edition 2024)

Contributing

Contributions are welcome! Please:

  1. Fork the repository
  2. Create a feature branch
  3. Ensure cargo fmt and cargo clippy pass
  4. Add tests for new functionality
  5. Submit a pull request

License

Licensed under either of:

at your option.