tokio-process-tools
A powerful library for spawning and managing processes in the Tokio runtime with advanced output handling capabilities.
When working with child processes in async Rust, you often need to:
- Monitor output in real-time without blocking
- Wait for specific log messages before proceeding
- Gracefully terminate processes with proper signal handling
- Collect output for later analysis
- Handle multiple concurrent consumers of the same stream
- Prevent spawned processes from leaking, not being terminated properly
tokio-process-tools tries to make all of this simple and ergonomic.
Features
- ✨ Real-time Output Inspection - Monitor stdout/stderr as they arrive, with sync and async callbacks
- 🔍 Pattern Matching - Wait for specific output before continuing execution
- 🎯 Flexible Collection - Gather output into vectors, files, or custom sinks
- 🔄 Multiple Consumers - Support for both single and broadcast (multi-consumer) stream consumption
- ⚡ Graceful Termination - Automatic signal escalation (SIGINT → SIGTERM → SIGKILL)
- 🛡️ Collection Safety - Fine-grained control over buffers used when collecting stdout/stderr streams
- 🛡️ Resource Safety - Panic-on-drop guards ensure processes are properly cleaned up
- ⏱️ Timeout Support - Built-in timeout handling for all operations
- 🌊 Backpressure Control - Configurable behavior when consumers can't keep up
Quick Start
Add to your Cargo.toml:
[dependencies]
tokio-process-tools = "0.6"
tokio = { version = "1", features = ["process", "sync", "io-util", "rt-multi-thread", "time"] }
Examples
Basic: Spawn and Collect Output
use tokio_process_tools::single_subscriber::SingleSubscriberOutputStream;
use tokio_process_tools::*;
#[tokio::main]
async fn main() {
let mut cmd = tokio::process::Command::new("ls");
let mut process = ProcessHandle::<SingleSubscriberOutputStream>::spawn("ls", cmd)
.expect("Failed to spawn command");
let Output { status, stdout, stderr } = process
.wait_for_completion_with_output(None, LineParsingOptions::default())
.await
.unwrap();
println!("Exit status: {:?}", status);
println!("Output: {:?}", stdout);
}
Stream Types: Which to Choose?
As shown by the first example, the ProcessHandle is generic over how the stdout/stderr streams are processed and made
available for consumption. There are two implementations to choose from:
SingleSubscriberOutputStream
- ✅ More efficient (lower memory, no cloning)
- ✅ Single consumer only
- ✅ Configurable backpressure handling
- 💡 Use when: You only need one way to consume output (e.g., just collecting OR just monitoring)
BroadcastOutputStream
- ✅ Multiple concurrent consumers
- ✅ Great for logging + collecting + monitoring simultaneously
- ❌ Slightly higher memory usage
- 💡 Use when: You need multiple operations on the same stream (e.g., log to console AND save to file)
Output Handling
Monitor Output in Real-Time
use std::time::Duration;
use tokio_process_tools::single_subscriber::SingleSubscriberOutputStream;
use tokio_process_tools::*;
#[tokio::main]
async fn main() {
let mut cmd = tokio::process::Command::new("tail");
cmd.arg("-f").arg("/var/log/app.log");
let mut process = ProcessHandle::<SingleSubscriberOutputStream>::spawn("tail", cmd).unwrap();
let _stdout_monitor = process.stdout_mut().inspect_lines(
|line| {
println!("stdout: {line}");
Next::Continue
},
LineParsingOptions::default()
);
tokio::time::sleep(Duration::from_secs(10)).await;
process.terminate(
Duration::from_secs(3), Duration::from_secs(5), ).await.unwrap();
}
Wait for Specific Output
Perfect for integration tests or ensuring services are ready:
use std::time::Duration;
use tokio_process_tools::single_subscriber::SingleSubscriberOutputStream;
use tokio_process_tools::*;
#[tokio::main]
async fn main() {
let mut cmd = tokio::process::Command::new("my-web-server");
let mut process = ProcessHandle::<SingleSubscriberOutputStream>::spawn("server", cmd).unwrap();
match process.stdout_mut().wait_for_line_with_timeout(
|line| line.contains("Server listening on"),
LineParsingOptions::default(),
Duration::from_secs(30),
).await {
Ok(_) => println!("Server is ready!"),
Err(_) => panic!("Server failed to start in time"),
}
process.wait_for_completion_or_terminate(
Duration::from_secs(5), Duration::from_secs(3), Duration::from_secs(5), ).await.unwrap();
}
Working with Multiple Consumers
use tokio_process_tools::broadcast::BroadcastOutputStream;
use tokio_process_tools::*;
#[tokio::main]
async fn main() {
let mut cmd = tokio::process::Command::new("long-running-process");
let mut process = ProcessHandle::<BroadcastOutputStream>::spawn("process", cmd).unwrap();
let _logger = process.stdout().inspect_lines(
|line| {
eprintln!("[LOG] {}", line);
Next::Continue
},
LineParsingOptions::default()
);
let log_file = tokio::fs::File::create("output.log").await.unwrap();
let _file_writer = process.stdout().collect_lines_into_write(
log_file,
LineParsingOptions::default()
);
let error_collector = process.stdout().collect_lines(
Vec::new(),
|line, vec| {
if line.contains("ERROR") {
vec.push(line);
}
Next::Continue
},
LineParsingOptions::default()
);
process.wait_for_completion(None).await.unwrap();
let errors = error_collector.wait().await.unwrap();
println!("Found {} errors", errors.len());
}
Advanced Processing
Chunk-Based Processing
For binary data or when you need raw bytes instead of lines:
use tokio_process_tools::broadcast::BroadcastOutputStream;
use tokio_process_tools::*;
#[tokio::main]
async fn main() {
let mut cmd = tokio::process::Command::new("cat");
cmd.arg("binary-file.dat");
let mut process = ProcessHandle::<BroadcastOutputStream>::spawn("cat", cmd).unwrap();
let chunk_collector = process.stdout().collect_chunks(
Vec::new(),
|chunk, buffer| {
buffer.extend_from_slice(chunk.as_ref());
}
);
process.wait_for_completion(None).await.unwrap();
let all_bytes = chunk_collector.wait().await.unwrap();
println!("Collected {} bytes", all_bytes.len());
}
Async Output Processing
use std::time::Duration;
use tokio_process_tools::broadcast::BroadcastOutputStream;
use tokio_process_tools::*;
#[tokio::main]
async fn main() {
let mut cmd = tokio::process::Command::new("data-processor");
let mut process = ProcessHandle::<BroadcastOutputStream>::spawn("processor", cmd).unwrap();
let _processor = process.stdout().inspect_lines_async(
async |line| {
process_line_in_database(&line).await;
Next::Continue
},
LineParsingOptions::default()
);
process.wait_for_completion(None).await.unwrap();
}
async fn process_line_in_database(line: &str) {
tokio::time::sleep(Duration::from_millis(10)).await;
}
Custom Collectors
use tokio::process::Command;
use tokio_process_tools::broadcast::BroadcastOutputStream;
use tokio_process_tools::*;
#[tokio::main]
async fn main() {
let cmd = Command::new("some-command");
let process = ProcessHandle::<BroadcastOutputStream>::spawn("cmd", cmd).unwrap();
#[derive(Debug)]
struct MyCollector {}
impl MyCollector {
fn process_line(&mut self, line: String) {
dbg!(line);
}
}
let custom_collector = process.stdout().collect_lines(
MyCollector {},
|line, custom| {
custom.process_line(line);
Next::Continue
},
LineParsingOptions::default()
);
let result = custom_collector.wait().await.unwrap();
}
Mapped Output
Transform output before writing into sink supporting the returned by the map closure.
use tokio::process::Command;
use tokio_process_tools::broadcast::BroadcastOutputStream;
use tokio_process_tools::*;
#[tokio::main]
async fn main() {
let cmd = Command::new("some-command");
let process = ProcessHandle::<BroadcastOutputStream>::spawn("cmd", cmd).unwrap();
let log_file = tokio::fs::File::create("output.log").await.unwrap();
let collector = process.stdout().collect_lines_into_write_mapped(
log_file,
|line| format!("[stdout] {line}\n"),
LineParsingOptions::default()
);
}
Custom Line Parsing
The LineParsingOptions type controls how data is read from stdout/stderr streams.
use tokio::process::Command;
use tokio_process_tools::broadcast::BroadcastOutputStream;
use tokio_process_tools::*;
#[tokio::main]
async fn main() {
let mut cmd = Command::new("some-command");
let mut process = ProcessHandle::<BroadcastOutputStream>::spawn("cmd", cmd).unwrap();
process.stdout().wait_for_line(
|line| line.contains("Ready"),
LineParsingOptions {
max_line_length: 1.megabytes(), overflow_behavior: LineOverflowBehavior::DropAdditionalData,
},
).await;
}
Process Management
Timeout with Automatic Termination
use std::time::Duration;
use tokio_process_tools::broadcast::BroadcastOutputStream;
use tokio_process_tools::*;
#[tokio::main]
async fn main() {
let mut cmd = tokio::process::Command::new("potentially-hanging-process");
let mut process = ProcessHandle::<BroadcastOutputStream>::spawn("process", cmd).unwrap();
match process.wait_for_completion_or_terminate(
Duration::from_secs(30), Duration::from_secs(3), Duration::from_secs(5), ).await {
Ok(status) => println!("Completed with status: {:?}", status),
Err(e) => eprintln!("Termination failed: {}", e),
}
}
Automatic Termination on Drop
use std::time::Duration;
use tokio::process::Command;
use tokio_process_tools::broadcast::BroadcastOutputStream;
use tokio_process_tools::*;
#[tokio::main]
async fn main() {
let cmd = Command::new("some-command");
let process = ProcessHandle::<BroadcastOutputStream>::spawn("cmd", cmd)
.unwrap()
.terminate_on_drop(Duration::from_secs(3), Duration::from_secs(5));
}
Testing Integration
Note: If you use this libraries TerminateOnDrop under a test, ensure that a multithreaded runtime is used with:
#[tokio::test(flavor = "multi_thread")]
async fn test() {
}
Platform Support
- ✅ Linux/macOS: Using SIGINT, SIGTERM, SIGKILL
- ✅ Windows: Using CTRL_C_EVENT, CTRL_BREAK_EVENT
Requirements
- Rust 1.85.0 or later (edition 2024)
Contributing
Contributions are welcome! Please:
- Fork the repository
- Create a feature branch
- Ensure
cargo fmt and cargo clippy pass
- Add tests for new functionality
- Submit a pull request
License
Licensed under either of:
at your option.