pub fn spawn_streaming<F>(
program: PathBuf,
args: Vec<String>,
env: Vec<(String, String)>,
cwd: PathBuf,
run_id: String,
callback: F,
) -> Result<ProcessHandle, StreamError>Expand description
Spawn an arbitrary streaming child process — the generic engine behind every process-backed harness (bob, Claude Code, Codex).
Pipes stdout/stderr line-by-line through callback using the raw
ProcessEvent vocabulary (Started / Stdout / Stderr / Error /
Exited). env supplies per-harness secrets (each harness’s API-key
var, or none for self-authenticating CLIs). PATH is augmented so
Node-based CLIs find node. Returns a ProcessHandle for
cancellation.
callback is invoked from three threads (stdout reader, stderr
reader, exit watcher); the Clone bound lets us hand a copy to each.
run_id is opaque — the caller chooses it and uses it to correlate
events with the handle.
use cli_stream::{spawn_streaming, ProcessEvent};
use std::path::PathBuf;
let handle = spawn_streaming(
PathBuf::from("echo"),
vec!["hello".to_owned()],
Vec::new(), // extra env vars (key, value)
std::env::current_dir().unwrap(),
"run-1".to_owned(), // your correlation id
|event| match event {
ProcessEvent::Stdout { line, .. } => println!("{line}"),
ProcessEvent::Exited { exit_code, .. } => eprintln!("exit {exit_code:?}"),
_ => {}
},
)?;
// `handle.cancel()` stops it early; dropping the handle does not.
let _ = handle;Examples found in repository?
examples/custom_harness.rs (lines 99-111)
82 fn run(&self, request: RunRequest, on_event: RunCallback) -> Result<RunHandle, HarnessError> {
83 // A real harness spawns its CLI here. We spawn `printf` to emit two
84 // JSON lines: an init (→ Session) and the answer (→ Text).
85 let answer = format!(r#"{{"text":"echo: {}"}}"#, request.prompt.replace('"', "'"));
86 let args = vec![
87 "%s\n".to_owned(),
88 r#"{"type":"init","model":"echo-1"}"#.to_owned(),
89 answer,
90 ];
91 let cwd = request
92 .cwd
93 .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
94
95 // One parser per run. The engine's callback is `Fn + Send + Sync`
96 // (invoked from reader threads), so per-run state lives behind an
97 // `Arc<Mutex>` — the same shape the built-in bob/codex adapters use.
98 let parser = Arc::new(Mutex::new(EchoParser::default()));
99 let handle = spawn_streaming(
100 PathBuf::from("printf"),
101 args,
102 Vec::new(),
103 cwd,
104 request.run_id,
105 move |event| {
106 let mut parser = parser.lock().expect("echo parser mutex");
107 for ev in parser.on_process_event(event) {
108 (*on_event)(ev);
109 }
110 },
111 )
112 .map_err(HarnessError::spawn)?;
113 Ok(Box::new(handle))
114 }