Skip to main content

spawn_streaming

Function spawn_streaming 

Source
pub fn spawn_streaming<F>(
    program: PathBuf,
    args: Vec<String>,
    env: Vec<(String, String)>,
    cwd: PathBuf,
    run_id: String,
    callback: F,
) -> Result<ProcessHandle, StreamError>
where F: FnMut(ProcessEvent) + Send + Sync + Clone + 'static,
Expand description

Spawn an arbitrary streaming child process — the generic engine behind every process-backed harness (bob, Claude Code, Codex).

Pipes stdout/stderr line-by-line through callback using the raw ProcessEvent vocabulary (Started / Stdout / Stderr / Error / Exited). env supplies per-harness secrets (each harness’s API-key var, or none for self-authenticating CLIs). PATH is augmented so Node-based CLIs find node. Returns a ProcessHandle for cancellation.

callback is invoked from three threads (stdout reader, stderr reader, exit watcher); the Clone bound lets us hand a copy to each. run_id is opaque — the caller chooses it and uses it to correlate events with the handle.

use cli_stream::{spawn_streaming, ProcessEvent};
use std::path::PathBuf;

let handle = spawn_streaming(
    PathBuf::from("echo"),
    vec!["hello".to_owned()],
    Vec::new(),                      // extra env vars (key, value)
    std::env::current_dir().unwrap(),
    "run-1".to_owned(),              // your correlation id
    |event| match event {
        ProcessEvent::Stdout { line, .. } => println!("{line}"),
        ProcessEvent::Exited { exit_code, .. } => eprintln!("exit {exit_code:?}"),
        _ => {}
    },
)?;
// `handle.cancel()` stops it early; dropping the handle does not.
let _ = handle;
Examples found in repository?
examples/custom_harness.rs (lines 99-111)
82    fn run(&self, request: RunRequest, on_event: RunCallback) -> Result<RunHandle, HarnessError> {
83        // A real harness spawns its CLI here. We spawn `printf` to emit two
84        // JSON lines: an init (→ Session) and the answer (→ Text).
85        let answer = format!(r#"{{"text":"echo: {}"}}"#, request.prompt.replace('"', "'"));
86        let args = vec![
87            "%s\n".to_owned(),
88            r#"{"type":"init","model":"echo-1"}"#.to_owned(),
89            answer,
90        ];
91        let cwd = request
92            .cwd
93            .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
94
95        // One parser per run. The engine's callback is `Fn + Send + Sync`
96        // (invoked from reader threads), so per-run state lives behind an
97        // `Arc<Mutex>` — the same shape the built-in bob/codex adapters use.
98        let parser = Arc::new(Mutex::new(EchoParser::default()));
99        let handle = spawn_streaming(
100            PathBuf::from("printf"),
101            args,
102            Vec::new(),
103            cwd,
104            request.run_id,
105            move |event| {
106                let mut parser = parser.lock().expect("echo parser mutex");
107                for ev in parser.on_process_event(event) {
108                    (*on_event)(ev);
109                }
110            },
111        )
112        .map_err(HarnessError::spawn)?;
113        Ok(Box::new(handle))
114    }