Expand description
§tokio-process-tools
A powerful library for spawning and managing processes in the Tokio runtime with advanced output handling capabilities.
When working with child processes in async Rust, you often need to:
- Monitor output in real-time without blocking
- Wait for specific log messages before proceeding
- Gracefully terminate processes with proper signal handling
- Collect output for later analysis
- Handle multiple concurrent consumers of the same stream
- Prevent spawned processes from leaking, not being terminated properly
tokio-process-tools tries to make all of this simple and ergonomic.
§Features
- ✨ Real-time Output Inspection - Monitor stdout/stderr as they arrive, with sync and async callbacks
- 🔍 Pattern Matching - Wait for specific output before continuing execution
- 🎯 Flexible Collection - Gather output into vectors, files, or custom sinks
- 🔄 Multiple Consumers - Support for both single and broadcast (multi-consumer) stream consumption
- ⚡ Graceful Termination - Automatic signal escalation (SIGINT → SIGTERM → SIGKILL)
- 🛡️ Collection Safety - Fine-grained control over buffers used when collecting stdout/stderr streams
- 🛡️ Resource Safety - Panic-on-drop guards ensure processes are properly cleaned up
- ⏱️ Timeout Support - Built-in timeout handling for all operations
- 🌊 Backpressure Control - Configurable behavior when consumers can’t keep up
§Quick Start
Add to your Cargo.toml:
[dependencies]
tokio-process-tools = "0.6"
tokio = { version = "1", features = ["process", "sync", "io-util", "rt-multi-thread", "time"] }§Examples
§Basic: Spawn and Collect Output
use tokio_process_tools::single_subscriber::SingleSubscriberOutputStream;
use tokio_process_tools::*;
#[tokio::main]
async fn main() {
let mut cmd = tokio::process::Command::new("ls");
let mut process = ProcessHandle::<SingleSubscriberOutputStream>::spawn("ls", cmd)
.expect("Failed to spawn command");
// Collect all output
let Output { status, stdout, stderr } = process
.wait_for_completion_with_output(None, LineParsingOptions::default())
.await
.unwrap();
println!("Exit status: {:?}", status);
println!("Output: {:?}", stdout);
}§Stream Types: Which to Choose?
As shown by the first example, the ProcessHandle is generic over how the stdout/stderr streams are processed and made available for consumption. There are two implementations to choose from:
§SingleSubscriberOutputStream
- ✅ More efficient (lower memory, no cloning)
- ✅ Single consumer only
- ✅ Configurable backpressure handling
- 💡 Use when: You only need one way to consume output (e.g., just collecting OR just monitoring)
§BroadcastOutputStream
- ✅ Multiple concurrent consumers
- ✅ Great for logging + collecting + monitoring simultaneously
- ❌ Slightly higher memory usage
- 💡 Use when: You need multiple operations on the same stream (e.g., log to console AND save to file)
§Output Handling
§Monitor Output in Real-Time
use std::time::Duration;
use tokio_process_tools::single_subscriber::SingleSubscriberOutputStream;
use tokio_process_tools::*;
#[tokio::main]
async fn main() {
let mut cmd = tokio::process::Command::new("tail");
cmd.arg("-f").arg("/var/log/app.log");
let mut process = ProcessHandle::<SingleSubscriberOutputStream>::spawn("tail", cmd).unwrap();
// Inspect output in real-time
let _stdout_monitor = process.stdout_mut().inspect_lines(
|line| {
println!("stdout: {line}");
Next::Continue
},
LineParsingOptions::default()
);
// Let it run for a while
tokio::time::sleep(Duration::from_secs(10)).await;
// Gracefully terminate
process.terminate(
Duration::from_secs(3), // SIGINT timeout
Duration::from_secs(5), // SIGTERM timeout
).await.unwrap();
}§Wait for Specific Output
Perfect for integration tests or ensuring services are ready:
use std::time::Duration;
use tokio_process_tools::single_subscriber::SingleSubscriberOutputStream;
use tokio_process_tools::*;
#[tokio::main]
async fn main() {
let mut cmd = tokio::process::Command::new("my-web-server");
let mut process = ProcessHandle::<SingleSubscriberOutputStream>::spawn("server", cmd).unwrap();
// Wait for the server to be ready
match process.stdout_mut().wait_for_line_with_timeout(
|line| line.contains("Server listening on"),
LineParsingOptions::default(),
Duration::from_secs(30),
).await {
Ok(_) => println!("Server is ready!"),
Err(_) => panic!("Server failed to start in time"),
}
// Now safe to make requests to the server
// ...
// Cleanup
process.wait_for_completion_or_terminate(
Duration::from_secs(5), // Wait timeout
Duration::from_secs(3), // SIGINT timeout
Duration::from_secs(5), // SIGTERM timeout
).await.unwrap();
}§Working with Multiple Consumers
use tokio_process_tools::broadcast::BroadcastOutputStream;
use tokio_process_tools::*;
#[tokio::main]
async fn main() {
let mut cmd = tokio::process::Command::new("long-running-process");
let mut process = ProcessHandle::<BroadcastOutputStream>::spawn("process", cmd).unwrap();
// Consumer 1: Log to console
let _logger = process.stdout().inspect_lines(
|line| {
eprintln!("[LOG] {}", line);
Next::Continue
},
LineParsingOptions::default()
);
// Consumer 2: Collect to file
let log_file = tokio::fs::File::create("output.log").await.unwrap();
let _file_writer = process.stdout().collect_lines_into_write(
log_file,
LineParsingOptions::default()
);
// Consumer 3: Search for errors
let error_collector = process.stdout().collect_lines(
Vec::new(),
|line, vec| {
if line.contains("ERROR") {
vec.push(line);
}
Next::Continue
},
LineParsingOptions::default()
);
// Wait for completion
process.wait_for_completion(None).await.unwrap();
// Get collected errors
let errors = error_collector.wait().await.unwrap();
println!("Found {} errors", errors.len());
}§Advanced Processing
§Chunk-Based Processing
For binary data or when you need raw bytes instead of lines:
use tokio_process_tools::broadcast::BroadcastOutputStream;
use tokio_process_tools::*;
#[tokio::main]
async fn main() {
let mut cmd = tokio::process::Command::new("cat");
cmd.arg("binary-file.dat");
let mut process = ProcessHandle::<BroadcastOutputStream>::spawn("cat", cmd).unwrap();
// Process raw chunks of bytes
let chunk_collector = process.stdout().collect_chunks(
Vec::new(),
|chunk, buffer| {
// Process raw bytes (e.g., binary protocol parsing)
buffer.extend_from_slice(chunk.as_ref());
}
);
process.wait_for_completion(None).await.unwrap();
let all_bytes = chunk_collector.wait().await.unwrap();
println!("Collected {} bytes", all_bytes.len());
}§Async Output Processing
use std::time::Duration;
use tokio_process_tools::broadcast::BroadcastOutputStream;
use tokio_process_tools::*;
#[tokio::main]
async fn main() {
let mut cmd = tokio::process::Command::new("data-processor");
let mut process = ProcessHandle::<BroadcastOutputStream>::spawn("processor", cmd).unwrap();
// Process output asynchronously (e.g., send to database)
let _processor = process.stdout().inspect_lines_async(
async |line| {
// Simulate async processing
process_line_in_database(&line).await;
Next::Continue
},
LineParsingOptions::default()
);
process.wait_for_completion(None).await.unwrap();
}
async fn process_line_in_database(line: &str) {
// Your async logic here
tokio::time::sleep(Duration::from_millis(10)).await;
}§Custom Collectors
use tokio::process::Command;
use tokio_process_tools::broadcast::BroadcastOutputStream;
use tokio_process_tools::*;
#[tokio::main]
async fn main() {
let cmd = Command::new("some-command");
let process = ProcessHandle::<BroadcastOutputStream>::spawn("cmd", cmd).unwrap();
#[derive(Debug)]
struct MyCollector {}
impl MyCollector {
fn process_line(&mut self, line: String) {
dbg!(line);
}
}
// Collect into any type implementing the Sink trait
let custom_collector = process.stdout().collect_lines(
MyCollector {},
|line, custom| {
custom.process_line(line);
Next::Continue
},
LineParsingOptions::default()
);
let result = custom_collector.wait().await.unwrap();
}§Mapped Output
Transform output before writing into sink supporting the returned by the map closure.
use tokio::process::Command;
use tokio_process_tools::broadcast::BroadcastOutputStream;
use tokio_process_tools::*;
#[tokio::main]
async fn main() {
let cmd = Command::new("some-command");
let process = ProcessHandle::<BroadcastOutputStream>::spawn("cmd", cmd).unwrap();
let log_file = tokio::fs::File::create("output.log").await.unwrap();
let collector = process.stdout().collect_lines_into_write_mapped(
log_file,
|line| format!("[stdout] {line}\n"),
LineParsingOptions::default()
);
}§Custom Line Parsing
The LineParsingOptions type controls how data is read from stdout/stderr streams.
use tokio::process::Command;
use tokio_process_tools::broadcast::BroadcastOutputStream;
use tokio_process_tools::*;
#[tokio::main]
async fn main() {
let mut cmd = Command::new("some-command");
let mut process = ProcessHandle::<BroadcastOutputStream>::spawn("cmd", cmd).unwrap();
process.stdout().wait_for_line(
|line| line.contains("Ready"),
LineParsingOptions {
max_line_length: 1.megabytes(), // Protect against memory exhaustion
overflow_behavior: LineOverflowBehavior::DropAdditionalData,
},
).await;
}§Process Management
§Timeout with Automatic Termination
use std::time::Duration;
use tokio_process_tools::broadcast::BroadcastOutputStream;
use tokio_process_tools::*;
#[tokio::main]
async fn main() {
let mut cmd = tokio::process::Command::new("potentially-hanging-process");
let mut process = ProcessHandle::<BroadcastOutputStream>::spawn("process", cmd).unwrap();
// Automatically terminate if it takes too long
match process.wait_for_completion_or_terminate(
Duration::from_secs(30), // Wait for 30s
Duration::from_secs(3), // Then send SIGINT, wait 3s
Duration::from_secs(5), // Then send SIGTERM, wait 5s
// If still running, sends SIGKILL
).await {
Ok(status) => println!("Completed with status: {:?}", status),
Err(e) => eprintln!("Termination failed: {}", e),
}
}§Automatic Termination on Drop
use std::time::Duration;
use tokio::process::Command;
use tokio_process_tools::broadcast::BroadcastOutputStream;
use tokio_process_tools::*;
#[tokio::main]
async fn main() {
let cmd = Command::new("some-command");
let process = ProcessHandle::<BroadcastOutputStream>::spawn("cmd", cmd)
.unwrap()
.terminate_on_drop(Duration::from_secs(3), Duration::from_secs(5));
// Process is automatically terminated when dropped.
// Requires a multithreaded runtime!
}§Testing Integration
Note: If you use this libraries TerminateOnDrop under a test, ensure that a multithreaded runtime is used with:
#[tokio::test(flavor = "multi_thread")]
async fn test() {
// ...
}§Platform Support
- ✅ Linux/macOS: Using SIGINT, SIGTERM, SIGKILL
- ✅ Windows: Using CTRL_C_EVENT, CTRL_BREAK_EVENT
§Requirements
- Rust 1.85.0 or later (edition 2024)
§Contributing
Contributions are welcome! Please:
- Fork the repository
- Create a feature branch
- Ensure
cargo fmtandcargo clippypass - Add tests for new functionality
- Submit a pull request
§License
Licensed under either of:
- Apache License, Version 2.0 (LICENSE-APACHE)
- MIT License (LICENSE-MIT)
at your option.
Modules§
- broadcast
- Broadcast output stream implementation supporting multiple concurrent consumers.
- single_
subscriber - Single subscriber output stream implementation for efficient single-consumer scenarios.
Structs§
- Collector
- A collector for stream data, inspecting it chunk by chunk but also providing mutable access to a sink in which the data can be stored.
- Inspector
- A collector for stream data, inspecting it chunk by chunk.
- Line
Parsing Options - Configuration options for parsing lines from a stream.
- NumBytes
- A wrapper type representing a number of bytes.
- Output
- Full output of a process that terminated.
- Process
Handle - A handle to a spawned process with captured stdout/stderr streams.
- Terminate
OnDrop - A wrapper that automatically terminates a process when dropped.
Enums§
- Collector
Error - Errors that can occur when collecting stream data.
- Inspector
Error - Errors that can occur when inspecting stream data.
- Line
Overflow Behavior - What should happen when a line is too long?
- Next
- Control flag to indicate whether processing should continue or break.
- Running
State - Represents the running state of a process.
- Termination
Error - Errors that can occur when terminating a process.
- Wait
Error - Errors that can occur when waiting for process output.
Traits§
- NumBytes
Ext - Extension trait providing convenience-functions for creation of
NumBytesof certain sizes. - Output
Stream - We support the following implementations:
- Sink
- A trait for types that can act as sinks for collected stream data.