# Streaming & interactive I/O
[‹ docs index](README.md)
The one-shot verbs in [Running commands](commands.md) buffer the whole output.
For long-running or conversational children, `Command::start()` returns a live
`RunningProcess` you drive yourself: stream stdout as it arrives, write stdin
incrementally, probe for readiness, race several children, or profile a run.
- [Lifecycle](#lifecycle)
- [Streaming stdout](#streaming-stdout)
- [Interactive stdin](#interactive-stdin)
- [Readiness probes](#readiness-probes)
- [Racing children with `wait_any`](#racing-children-with-wait_any)
- [Per-run telemetry](#per-run-telemetry)
## Lifecycle
```rust,no_run
use processkit::Command;
let mut run = Command::new("dev-server").start().await?;
run.pid(); // Option<u32> — None once the child is reaped
run.elapsed(); // time since spawn
// Consume the handle exactly one way:
// output_string() / output_bytes() → capture everything (same as the one-shot verbs)
// wait() → just the exit code; output is discarded
// finish_streamed() → after streaming stdout (below)
// profile(every) → capture + resource samples (stats feature)
let code = run.wait().await?; // Option<i32>; None = killed without a code
```
`start()` puts the child in a **private group the handle owns**: dropping the
`RunningProcess` kills the whole tree, exactly like dropping a one-shot run's
future. The shared-group variant — `group.start(&cmd)` — gives the same handle
but the *group* controls the tree's fate (see
[Process groups](process-groups.md#putting-processes-in)).
There is also an explicit `run.start_kill()` for "stop it now, I'll `wait()`
for the code myself".
## Streaming stdout
`stdout_lines()` yields decoded lines as the child produces them — no waiting
for exit, no full-output buffering. `StreamExt` (re-exported from
`tokio-stream`) provides `.next()`:
```rust,no_run
use processkit::{Command, StreamExt};
#[tokio::main]
async fn main() -> processkit::Result<()> {
let mut run = Command::new("cargo")
.args(["build", "--release"])
.start()
.await?;
let mut lines = run.stdout_lines();
while let Some(line) = lines.next().await {
println!("build: {line}");
}
// The stream ended (stdout closed). Collect the exit code and stderr —
// stderr was drained in the background the whole time, so a noisy child
// could never block on a full pipe.
let (code, stderr) = run.finish_streamed().await?;
if code != Some(0) {
eprintln!("build failed ({code:?}):\n{stderr}");
}
Ok(())
}
```
Things to know:
- **Call `stdout_lines()` once.** A second call returns an
immediately-finished stream (the pipe is already being pumped).
- **The command's `timeout` bounds the stream** on an own-group handle: at the
deadline the tree is killed, the pipes close, and the stream ends — a
streamed run can't hang past its deadline. A `cancel_on` token (with the
`cancellation` feature) ends it the same way; the following
`finish_streamed` then reports `Error::Cancelled`. Details in
[Timeouts & cancellation](timeouts-and-cancellation.md).
- Line counters tick live: `run.stdout_line_count()` / `stderr_line_count()`
are cheap progress gauges even while you stream.
- The [buffer policy and line handlers](commands.md#output-handling) apply to
streamed runs too — a handler sees each line on the pump, in addition to
your loop.
- The whole streaming surface is **hermetically testable**: a
`ScriptedRunner`'s `start()` returns a handle whose canned lines flow
through the same pump machinery — `stdout_lines`, the readiness probes, and
`finish_streamed` behave identically with no subprocess. See
[Testing → scripted streaming](testing.md#scripted-streaming).
## Interactive stdin
Conversational tools — write a request, read the response, repeat. Keep stdin
open with `keep_stdin_open()`, take the writer with `standard_input()`:
```rust,no_run
use processkit::{Command, StreamExt};
#[tokio::main]
async fn main() -> processkit::Result<()> {
// `bc` evaluates each stdin line and prints the result.
let mut run = Command::new("bc").keep_stdin_open().start().await?;
let mut stdin = run.standard_input().expect("stdin was kept open");
let mut answers = run.stdout_lines();
stdin.write_line("2 + 2").await?; // writes "2 + 2\n", flushed
println!("= {}", answers.next().await.unwrap());
stdin.write_line("6 * 7").await?;
println!("= {}", answers.next().await.unwrap());
stdin.finish().await?; // send EOF — bc exits
let (code, _stderr) = run.finish_streamed().await?;
assert_eq!(code, Some(0));
Ok(())
}
```
`ProcessStdin` offers `write(&[u8])`, `write_line(&str)` (newline + flush),
`flush()`, and `finish()` (EOF). Dropping the writer — or the whole
`RunningProcess` — closes stdin too; `finish()` just makes the EOF explicit
and awaitable.
For *one-directional* streamed input (a channel, a file tail) you don't need
interactivity — give the command `Stdin::from_lines(stream)` /
`Stdin::from_reader(reader)` and let the background writer feed it; see the
[stdin source table](commands.md#standard-input).
## Readiness probes
"Start a server, then use it" needs *ready*, not merely *started*. Three
probes replace the arbitrary sleep, each bounded by its own deadline:
```rust,no_run
use processkit::Command;
use std::time::Duration;
let mut run = Command::new("my-server").start().await?;
// 1. A line on stdout (returns the matching line):
let banner = run
.wait_for_line(|l| l.contains("listening on"), Duration::from_secs(10))
.await?;
// 2. A TCP port accepting connections:
run.wait_for_port("127.0.0.1:8080".parse().unwrap(), Duration::from_secs(10))
.await?;
// 3. Any async predicate (an HTTP /health endpoint, a file appearing, …):
run.wait_for(|| async { health_check().await }, Duration::from_secs(10))
.await?;
// ready — use the server…
```
Probe semantics, deliberately uniform:
- A probe that can't pass within its deadline fails with **`Error::NotReady`**
— distinct from `Error::Timeout`, which is the run's own deadline.
- A probe also fails *fast* once readiness can no longer happen: the child
exits, or (for `wait_for_line`) its stdout closes — no waiting out a 30s
deadline on a dead server.
- A failed probe **never kills the child.** You decide: retry, log and
continue, or tear down.
- `wait_for_line` consumes stdout up to (and including) the match — continue
with `finish_streamed` or further streaming. `wait_for_port` / `wait_for`
don't touch the pipes at all.
## Racing children with `wait_any`
The free function `wait_any` races several running processes and reports
whichever exits first — the natural primitive for "restart whatever died" or
"first answer wins":
```rust,no_run
use processkit::{Command, ProcessGroup, wait_any};
let group = ProcessGroup::new()?;
let mut a = group.start(&Command::new("replica-a")).await?;
let mut b = group.start(&Command::new("replica-b")).await?;
let (index, code) = wait_any(&mut [&mut a, &mut b]).await?;
println!("contender #{index} exited first with {code:?}");
// Only borrows: the loser is still usable.
let survivor = if index == 0 { &mut b } else { &mut a };
```
`wait_any` takes `&mut` borrows, applies no timeout of its own (wrap it in
`tokio::time::timeout` to bound the race), and does no output pumping — drain
chatty children first or give them bounded
[buffer policies](commands.md#output-handling).
## Per-run telemetry
With the default-on **`stats`** feature, a running child reports its own
resource usage, and `profile()` turns a whole run into a summary:
```rust,no_run
use processkit::Command;
use std::time::Duration;
let run = Command::new("crunch").start().await?;
run.cpu_time(); // Option<Duration> — user+kernel so far
run.peak_memory_bytes(); // Option<u64>
// …or capture + sample on an interval until exit:
let profile = Command::new("crunch")
.start().await?
.profile(Duration::from_millis(100))
.await?;
println!(
"exit={:?} wall={:?} cpu={:?} peak_rss={:?} avg_cpu={:?} ({} samples)",
profile.exit_code,
profile.duration,
profile.cpu_time,
profile.peak_memory_bytes,
profile.avg_cpu(), // cpu / wall — e.g. Some(1.7) ≈ 1.7 cores busy
profile.samples,
);
```
These read the *child process itself* (not a whole tree — that's
[`ProcessGroup::stats`](process-groups.md#stats-and-sampling)), and
availability follows the platform: full CPU/memory on Windows and Linux,
`None` where the kernel doesn't account per-process cheaply — see
[Platform support](platform-support.md).
---
Next: [Pipelines](pipelines.md) ·
[Timeouts, retries & cancellation](timeouts-and-cancellation.md) ·
[Supervision](supervision.md)