Skip to main content

omne_cli/
claude_proc.rs

1//! `claude -p --output-format stream-json` subprocess client.
2//!
3//! Unit 9 substrate: drive a `claude` subprocess, parse its stream-json
4//! output, reconstruct assistant text into lines, and capture the
5//! concatenated text to `.omne/var/runs/<run_id>/nodes/<node_id>.out`.
6//! The sentinel scanner (Unit 10) and node executor (Unit 11) consume
7//! the reconstructed-line iterator exposed here.
8//!
9//! The module is intentionally split into two layers:
10//!
11//! - [`StreamParser`] — a generic `BufRead` → `AssistantLine` iterator.
12//!   Unit tests and downstream reuse (future replay/inspection tooling)
13//!   drive this directly with canned fixtures; no subprocess required.
14//! - [`ClaudeProcess`] — the subprocess wrapper that owns a `Child`,
15//!   drains its stderr on a background thread (so the pipe buffer can
16//!   never deadlock a long-running `claude -p` on a quiet run), and
17//!   delegates iteration to a `StreamParser<BufReader<ChildStdout>>`.
18//!
19//! Reconstruction contract (plan R8, spike 2026-04-15):
20//!
21//! - Each stream-json line is one envelope. We only care about
22//!   `{"type":"assistant", "message": { "id": ..., "content": [...] }}`
23//!   entries. All other top-level types are skipped — including
24//!   `system.hook_response`, which SessionStart hooks emit under `-p`
25//!   (spike-confirmed noise) and which must never leak into the
26//!   sentinel scanner.
27//! - `claude -p` may split a single logical message across multiple
28//!   envelopes that share `message.id`. Text blocks are therefore
29//!   accumulated into a per-`id` buffer and flushed when a new id is
30//!   observed or the stream ends. Flushing splits the buffer on `\n`,
31//!   trims each line, drops empties, and yields them in order.
32//! - All received text blocks are written verbatim to the capture file
33//!   as they arrive. No per-line rewriting: the capture is the raw
34//!   concatenation of every `text` block from every assistant message,
35//!   which matches the plan's "concatenated assistant text" contract.
36
37#![allow(dead_code)]
38
39use std::collections::{HashMap, VecDeque};
40use std::ffi::OsStr;
41use std::fs::{File, OpenOptions};
42use std::io::{BufRead, BufReader, Read, Write};
43use std::path::{Path, PathBuf};
44use std::process::{Child, ChildStdout, Command, ExitStatus, Stdio};
45use std::sync::{Arc, Mutex};
46use std::thread::{self, JoinHandle};
47use std::time::Duration;
48
49use thiserror::Error;
50use wait_timeout::ChildExt;
51
52/// Default binary name probed on `PATH`.
53pub const DEFAULT_BIN: &str = "claude";
54
55/// Wall-clock budget for [`preflight`] (i.e. `claude --version`). Long
56/// enough for a slow Windows process spawn; short enough to fail fast
57/// when the host binary is wedged. Runtime invocations (`stream`) have
58/// no timeout at this layer — Unit 11's executor owns node-level
59/// deadlines.
60pub const PREFLIGHT_TIMEOUT: Duration = Duration::from_secs(10);
61
62/// Errors surfaced by the `claude -p` subprocess client.
63#[derive(Debug, Error)]
64pub enum Error {
65    /// `claude` is not on `PATH`, or the explicit override path does
66    /// not point at an executable. Distinct from [`Error::Spawn`] so
67    /// `omne validate` and the executor can emit an install hint.
68    #[error(
69        "claude binary not found on PATH\n\
70         hint: install Claude Code and ensure `claude` is on PATH"
71    )]
72    HostMissing,
73
74    /// `std::process::Command::spawn` failed for a reason other than
75    /// the binary being missing (permission denied, I/O error on the
76    /// pipe allocation, etc).
77    #[error("failed to launch claude: {source}")]
78    Spawn {
79        #[source]
80        source: std::io::Error,
81    },
82
83    /// The preflight subprocess did not exit within
84    /// [`PREFLIGHT_TIMEOUT`]. The child is killed before this error is
85    /// returned so no zombie leaks.
86    #[error("claude did not exit within {elapsed:?}")]
87    Timeout { elapsed: Duration },
88
89    /// The child exited with a non-zero status. The stderr tail is
90    /// carried verbatim so callers can surface it in their own error
91    /// reporting.
92    #[error("claude exited with {status}\nstderr: {stderr}")]
93    ExitedNonZero { status: ExitStatus, stderr: String },
94
95    /// I/O error reading the child's stdout or awaiting its exit.
96    #[error("I/O error on claude stream: {source}")]
97    Io {
98        #[source]
99        source: std::io::Error,
100    },
101
102    /// I/O error writing the per-node capture file.
103    #[error("capture I/O on {path}: {source}")]
104    Capture {
105        path: PathBuf,
106        #[source]
107        source: std::io::Error,
108    },
109}
110
111/// Session lifecycle for one `claude -p` invocation.
112///
113/// Locked by Unit 0 spike: `--session-id` on the first iteration of a
114/// `fresh_context: false` loop, `--resume` on every subsequent
115/// iteration. The id is a UUID (not a ULID) — `claude --session-id`
116/// rejects non-UUID strings. Unit 11's loop controller allocates and
117/// passes this value.
118#[derive(Debug, Clone, Eq, PartialEq)]
119pub enum Session {
120    /// First invocation: set deterministic session id via
121    /// `--session-id <uuid>`.
122    New(String),
123    /// Subsequent invocations: resume via `--resume <uuid>`.
124    Resume(String),
125}
126
127/// Arguments used to build a `claude -p` invocation.
128#[derive(Debug, Clone)]
129pub struct SpawnOpts {
130    /// Positional prompt argument passed last to `claude -p`.
131    pub prompt: String,
132    /// Working directory for the child process. Unit 11 sets this to
133    /// the per-run worktree.
134    pub cwd: PathBuf,
135    /// Optional `--model <id>`. Omitted when `None`.
136    pub model: Option<String>,
137    /// Comma-joined into `--allowed-tools`. Empty when the node places
138    /// no restriction.
139    pub allowed_tools: Vec<String>,
140    /// Loop session flag (Unit 11); `None` for one-shot nodes.
141    pub session: Option<Session>,
142    /// Escape hatch for per-distro flags (e.g. `--bare`,
143    /// `--dangerously-skip-permissions`). Appended verbatim before the
144    /// prompt.
145    pub extra_args: Vec<String>,
146    /// Override binary path. Defaults to [`DEFAULT_BIN`] on `PATH`.
147    pub bin: Option<PathBuf>,
148}
149
150impl SpawnOpts {
151    /// Minimal constructor for the common case: prompt + cwd, no
152    /// model / tools / session.
153    pub fn new(prompt: impl Into<String>, cwd: impl Into<PathBuf>) -> Self {
154        Self {
155            prompt: prompt.into(),
156            cwd: cwd.into(),
157            model: None,
158            allowed_tools: Vec::new(),
159            session: None,
160            extra_args: Vec::new(),
161            bin: None,
162        }
163    }
164}
165
166/// How [`StreamParser`] opens the capture file.
167///
168/// Non-loop and first-iteration loop nodes want `Truncate` so a replayed
169/// run does not concatenate stale text from a prior attempt. Subsequent
170/// loop iterations want `Append` so all iterations accumulate in the
171/// same `nodes/<id>.out` (with iteration markers written separately by
172/// the executor). Default: `Truncate`.
173#[derive(Debug, Clone, Copy, Eq, PartialEq, Default)]
174pub enum CaptureMode {
175    #[default]
176    Truncate,
177    Append,
178}
179
180/// One reconstructed line of assistant text.
181///
182/// `message_id` is the `message.id` field carried by every `assistant`
183/// stream-json envelope. Unit 10's sentinel scanner does not need it
184/// today (matches are on `text` alone), but Unit 11's telemetry and
185/// future replay tooling benefit from knowing which turn produced each
186/// line, so it is surfaced rather than hidden.
187#[derive(Debug, Clone, Eq, PartialEq)]
188pub struct AssistantLine {
189    pub text: String,
190    pub message_id: String,
191}
192
193/// Run `claude --version` and return the reported version string.
194///
195/// `bin = None` uses the `PATH`-resolved `claude` binary. An explicit
196/// path is useful for tests and for environments where multiple
197/// installations coexist.
198pub fn preflight(bin: Option<&Path>) -> Result<String, Error> {
199    let bin_os: &OsStr = bin
200        .map(|b| b.as_os_str())
201        .unwrap_or_else(|| OsStr::new(DEFAULT_BIN));
202
203    // Pre-probe for HostMissing so the error surfaces cleanly instead
204    // of going through the ambiguous `Command::spawn` error path.
205    if bin.is_none() && which::which(DEFAULT_BIN).is_err() {
206        return Err(Error::HostMissing);
207    }
208
209    let mut child = Command::new(bin_os)
210        .arg("--version")
211        .stdin(Stdio::null())
212        .stdout(Stdio::piped())
213        .stderr(Stdio::piped())
214        .spawn()
215        .map_err(map_spawn_error)?;
216
217    let status = match child
218        .wait_timeout(PREFLIGHT_TIMEOUT)
219        .map_err(|source| Error::Io { source })?
220    {
221        Some(status) => status,
222        None => {
223            let _ = child.kill();
224            let _ = child.wait();
225            return Err(Error::Timeout {
226                elapsed: PREFLIGHT_TIMEOUT,
227            });
228        }
229    };
230
231    let mut stdout = String::new();
232    if let Some(mut s) = child.stdout.take() {
233        s.read_to_string(&mut stdout)
234            .map_err(|source| Error::Io { source })?;
235    }
236    let mut stderr = String::new();
237    if let Some(mut s) = child.stderr.take() {
238        s.read_to_string(&mut stderr)
239            .map_err(|source| Error::Io { source })?;
240    }
241
242    if !status.success() {
243        return Err(Error::ExitedNonZero { status, stderr });
244    }
245    Ok(stdout.trim().to_string())
246}
247
248/// Build the `Command` that [`spawn`] would launch. Exposed so tests
249/// and Unit 11's executor can inspect / adjust the argv without
250/// actually spawning.
251pub fn build_command(opts: &SpawnOpts) -> Command {
252    let bin_os: &OsStr = opts
253        .bin
254        .as_deref()
255        .map(|p| p.as_os_str())
256        .unwrap_or_else(|| OsStr::new(DEFAULT_BIN));
257
258    let mut cmd = Command::new(bin_os);
259    cmd.current_dir(&opts.cwd);
260    cmd.args(["-p", "--output-format", "stream-json", "--verbose"]);
261
262    if let Some(model) = &opts.model {
263        cmd.args(["--model", model]);
264    }
265    if !opts.allowed_tools.is_empty() {
266        cmd.arg("--allowed-tools").arg(opts.allowed_tools.join(","));
267    }
268    match &opts.session {
269        Some(Session::New(uuid)) => {
270            cmd.args(["--session-id", uuid]);
271        }
272        Some(Session::Resume(uuid)) => {
273            cmd.args(["--resume", uuid]);
274        }
275        None => {}
276    }
277    for extra in &opts.extra_args {
278        cmd.arg(extra);
279    }
280
281    // Positional prompt goes last so it is unambiguous even if an
282    // `extra_args` entry starts with `-`.
283    cmd.arg(&opts.prompt);
284
285    cmd.stdin(Stdio::null());
286    cmd.stdout(Stdio::piped());
287    cmd.stderr(Stdio::piped());
288    cmd
289}
290
291/// Spawn `claude -p` per [`SpawnOpts`] and return the `Child`.
292///
293/// Stdin is redirected from `/dev/null` so the subprocess never waits
294/// for interactive input; stdout and stderr are piped so the caller
295/// can feed them to [`ClaudeProcess`].
296pub fn spawn(opts: &SpawnOpts) -> Result<Child, Error> {
297    build_command(opts).spawn().map_err(map_spawn_error)
298}
299
300/// Wire a live `Child` into a [`ClaudeProcess`] that yields
301/// [`AssistantLine`] values and writes the reconstructed text to
302/// `capture_path` (truncating any prior content).
303pub fn stream(child: Child, capture_path: &Path) -> Result<ClaudeProcess, Error> {
304    ClaudeProcess::from_child(child, capture_path)
305}
306
307/// Translate a `Command::spawn` failure into [`Error::HostMissing`]
308/// when the OS reports `NotFound`, otherwise [`Error::Spawn`]. Shared
309/// between [`preflight`] and [`spawn`].
310fn map_spawn_error(source: std::io::Error) -> Error {
311    if source.kind() == std::io::ErrorKind::NotFound {
312        Error::HostMissing
313    } else {
314        Error::Spawn { source }
315    }
316}
317
318/// Parses stream-json envelopes from any `BufRead` and yields
319/// [`AssistantLine`] values, while capturing all reconstructed text
320/// to a backing file.
321///
322/// The parser owns the capture file so iteration and capture advance
323/// in lockstep; callers who just want the lines can drop the iterator
324/// and the capture file closes on its own. Flushing happens on every
325/// message transition and on EOF.
326pub struct StreamParser<R: BufRead> {
327    reader: R,
328    capture: File,
329    capture_path: PathBuf,
330    /// Per-message-id text buffer. Keyed by `message.id` because
331    /// `claude -p` may emit multiple envelopes with the same id when
332    /// partial-message streaming is active.
333    buffers: HashMap<String, String>,
334    /// Insertion order of ids observed, so flush-on-EOF yields lines
335    /// in deterministic order across messages. We also push to this
336    /// on new-id observation, and pop the id when it is flushed.
337    order: VecDeque<String>,
338    /// Id of the most recently seen `assistant` envelope. Used to
339    /// detect id transitions (flush prior) without scanning `order`.
340    current_id: Option<String>,
341    /// Lines ready to be yielded. Populated at flush time.
342    pending: VecDeque<AssistantLine>,
343    finished: bool,
344}
345
346impl<R: BufRead> StreamParser<R> {
347    /// Build a parser over `reader`, creating or truncating
348    /// `capture_path` (and its parent directory) for the reconstructed
349    /// text. Equivalent to [`StreamParser::with_mode`] with
350    /// [`CaptureMode::Truncate`].
351    pub fn new(reader: R, capture_path: &Path) -> Result<Self, Error> {
352        Self::with_mode(reader, capture_path, CaptureMode::Truncate)
353    }
354
355    /// Build a parser over `reader` with explicit capture-open mode.
356    ///
357    /// [`CaptureMode::Truncate`] wipes prior content; [`CaptureMode::Append`]
358    /// positions writes at end-of-file so loop iterations accumulate in
359    /// one file (plan R11 "same `nodes/<id>.out` with iteration markers").
360    pub fn with_mode(reader: R, capture_path: &Path, mode: CaptureMode) -> Result<Self, Error> {
361        if let Some(parent) = capture_path.parent() {
362            if !parent.as_os_str().is_empty() {
363                std::fs::create_dir_all(parent).map_err(|source| Error::Capture {
364                    path: parent.to_path_buf(),
365                    source,
366                })?;
367            }
368        }
369        let mut open = OpenOptions::new();
370        open.create(true).write(true);
371        match mode {
372            CaptureMode::Truncate => {
373                open.truncate(true);
374            }
375            CaptureMode::Append => {
376                open.append(true);
377            }
378        }
379        let capture = open.open(capture_path).map_err(|source| Error::Capture {
380            path: capture_path.to_path_buf(),
381            source,
382        })?;
383        Ok(Self {
384            reader,
385            capture,
386            capture_path: capture_path.to_path_buf(),
387            buffers: HashMap::new(),
388            order: VecDeque::new(),
389            current_id: None,
390            pending: VecDeque::new(),
391            finished: false,
392        })
393    }
394
395    /// Ingest one raw line. Parses as JSON; dispatches on top-level
396    /// `type`. Returns `Ok(())` for skippable envelopes and for
397    /// malformed lines (logged + dropped per plan R-deferred
398    /// "Streamed JSON parsing resilience").
399    fn ingest_line(&mut self, raw: &str) -> Result<(), Error> {
400        let trimmed = raw.trim();
401        if trimmed.is_empty() {
402            return Ok(());
403        }
404        let env: serde_json::Value = match serde_json::from_str(trimmed) {
405            Ok(v) => v,
406            Err(err) => {
407                eprintln!("claude_proc: skipping malformed stream-json line: {err}");
408                return Ok(());
409            }
410        };
411        let ty = env.get("type").and_then(|t| t.as_str()).unwrap_or("");
412        if ty != "assistant" {
413            // Silent skip for the envelopes the plan explicitly
414            // excludes (system/user/result/rate_limit_event) and for
415            // unknown future types. `system.hook_response` lands here
416            // and is the load-bearing spike-confirmed noise source.
417            return Ok(());
418        }
419
420        let Some(message) = env.get("message") else {
421            return Ok(());
422        };
423        let id = message
424            .get("id")
425            .and_then(|v| v.as_str())
426            .unwrap_or("")
427            .to_string();
428
429        // New id ≠ currently-open id → flush the prior message first
430        // so the yield order is <prior lines>, then <this message's
431        // lines> (on subsequent ingest or EOF).
432        if let Some(prev) = self.current_id.as_ref() {
433            if prev != &id {
434                let prev = prev.clone();
435                self.flush_message(&prev);
436            }
437        }
438        if !self.buffers.contains_key(&id) {
439            self.order.push_back(id.clone());
440        }
441        self.current_id = Some(id.clone());
442
443        let empty = Vec::new();
444        let content = message
445            .get("content")
446            .and_then(|c| c.as_array())
447            .unwrap_or(&empty);
448        let mut appended = String::new();
449        for block in content {
450            let block_type = block.get("type").and_then(|t| t.as_str()).unwrap_or("");
451            if block_type != "text" {
452                continue;
453            }
454            if let Some(text) = block.get("text").and_then(|v| v.as_str()) {
455                appended.push_str(text);
456            }
457        }
458        if !appended.is_empty() {
459            self.buffers.entry(id).or_default().push_str(&appended);
460            self.capture
461                .write_all(appended.as_bytes())
462                .map_err(|source| Error::Capture {
463                    path: self.capture_path.clone(),
464                    source,
465                })?;
466        }
467        Ok(())
468    }
469
470    /// Split `buffers[id]` on `\n`, trim and drop empties, push the
471    /// survivors onto `pending` in order. Also removes `id` from
472    /// `order` so the iterator does not re-flush it.
473    fn flush_message(&mut self, id: &str) {
474        let Some(buf) = self.buffers.remove(id) else {
475            return;
476        };
477        // Keep `order` in sync so EOF-flush does not re-yield.
478        if let Some(pos) = self.order.iter().position(|x| x == id) {
479            self.order.remove(pos);
480        }
481        for piece in buf.split('\n') {
482            let trimmed = piece.trim();
483            if trimmed.is_empty() {
484                continue;
485            }
486            self.pending.push_back(AssistantLine {
487                text: trimmed.to_string(),
488                message_id: id.to_string(),
489            });
490        }
491    }
492
493    /// Flush every outstanding buffer. Called when the stream reaches
494    /// EOF so trailing partial-line text from the last message still
495    /// reaches the consumer.
496    fn flush_all(&mut self) {
497        self.current_id = None;
498        while let Some(id) = self.order.pop_front() {
499            // flush_message removes from `order`, but since we've
500            // already drained it here, re-inserting the id as the
501            // argument and letting flush_message work on `buffers`
502            // only is fine.
503            let buf = match self.buffers.remove(&id) {
504                Some(b) => b,
505                None => continue,
506            };
507            for piece in buf.split('\n') {
508                let trimmed = piece.trim();
509                if trimmed.is_empty() {
510                    continue;
511                }
512                self.pending.push_back(AssistantLine {
513                    text: trimmed.to_string(),
514                    message_id: id.clone(),
515                });
516            }
517        }
518    }
519
520    /// Flush the capture file to disk. Called by
521    /// [`ClaudeProcess::finish`] before reporting exit status so a
522    /// crashing caller cannot leave buffered capture data unflushed.
523    fn flush_capture(&mut self) -> Result<(), Error> {
524        self.capture.flush().map_err(|source| Error::Capture {
525            path: self.capture_path.clone(),
526            source,
527        })
528    }
529}
530
531impl<R: BufRead> Iterator for StreamParser<R> {
532    type Item = Result<AssistantLine, Error>;
533
534    fn next(&mut self) -> Option<Self::Item> {
535        loop {
536            if let Some(line) = self.pending.pop_front() {
537                return Some(Ok(line));
538            }
539            if self.finished {
540                return None;
541            }
542            let mut raw = String::new();
543            match self.reader.read_line(&mut raw) {
544                Ok(0) => {
545                    self.finished = true;
546                    self.flush_all();
547                    if let Err(e) = self.flush_capture() {
548                        return Some(Err(e));
549                    }
550                }
551                Ok(_) => {
552                    if let Err(e) = self.ingest_line(&raw) {
553                        return Some(Err(e));
554                    }
555                }
556                Err(source) => {
557                    self.finished = true;
558                    return Some(Err(Error::Io { source }));
559                }
560            }
561        }
562    }
563}
564
565/// Subprocess wrapper around a live `claude -p` [`Child`] that yields
566/// [`AssistantLine`] values and, on [`ClaudeProcess::finish`], returns
567/// the child's exit status and captured stderr.
568///
569/// Stderr is drained on a dedicated thread as soon as the process is
570/// wrapped. Without this, a chatty stderr (warnings, deprecation
571/// notices) would fill its pipe buffer and block the child until
572/// somebody reads it — which no one does during a sentinel-driven
573/// loop iteration that only cares about stdout.
574pub struct ClaudeProcess {
575    parser: StreamParser<BufReader<ChildStdout>>,
576    child: Arc<Mutex<Option<Child>>>,
577    stderr_thread: Option<JoinHandle<Vec<u8>>>,
578}
579
580/// Cloneable handle that can kill a live [`ClaudeProcess`].
581///
582/// Returned by [`ClaudeProcess::killer`]. The executor's timeout
583/// watchdog thread holds one while the main thread drains the parser;
584/// if the deadline elapses before the stream ends, the watchdog calls
585/// [`ChildKiller::kill`] and the main thread's next `read_line` returns
586/// `Ok(0)` once the child's stdout pipe closes.
587#[derive(Clone)]
588pub struct ChildKiller {
589    child: Arc<Mutex<Option<Child>>>,
590}
591
592impl ChildKiller {
593    /// Send SIGKILL (or its platform equivalent) to the child if it is
594    /// still alive. A no-op once [`ClaudeProcess::finish`] has taken
595    /// the child out.
596    pub fn kill(&self) -> std::io::Result<()> {
597        let mut guard = match self.child.lock() {
598            Ok(g) => g,
599            Err(poisoned) => poisoned.into_inner(),
600        };
601        if let Some(c) = guard.as_mut() {
602            return c.kill();
603        }
604        Ok(())
605    }
606}
607
608impl ClaudeProcess {
609    /// Wrap `child` and start draining its stderr.
610    ///
611    /// The child's `stdout` and `stderr` handles must both be piped
612    /// (i.e. the child was spawned via [`spawn`] or via
613    /// [`build_command`]). A missing stdout is fatal because the
614    /// parser has nothing to read. Equivalent to
615    /// [`ClaudeProcess::from_child_with_mode`] with
616    /// [`CaptureMode::Truncate`].
617    pub fn from_child(child: Child, capture_path: &Path) -> Result<Self, Error> {
618        Self::from_child_with_mode(child, capture_path, CaptureMode::Truncate)
619    }
620
621    /// Same as [`ClaudeProcess::from_child`] but with explicit capture
622    /// open mode. Loop iterations ≥ 2 pass [`CaptureMode::Append`] so
623    /// prior iterations' text is preserved.
624    pub fn from_child_with_mode(
625        mut child: Child,
626        capture_path: &Path,
627        mode: CaptureMode,
628    ) -> Result<Self, Error> {
629        let stdout = child.stdout.take().ok_or_else(|| Error::Io {
630            source: std::io::Error::new(
631                std::io::ErrorKind::BrokenPipe,
632                "claude child spawned without piped stdout",
633            ),
634        })?;
635        let stderr_thread = child.stderr.take().map(|mut s| {
636            thread::spawn(move || {
637                let mut buf = Vec::new();
638                let _ = s.read_to_end(&mut buf);
639                buf
640            })
641        });
642        let parser = StreamParser::with_mode(BufReader::new(stdout), capture_path, mode)?;
643        Ok(Self {
644            parser,
645            child: Arc::new(Mutex::new(Some(child))),
646            stderr_thread,
647        })
648    }
649
650    /// Clone a kill-handle for a timeout watchdog. See [`ChildKiller`].
651    pub fn killer(&self) -> ChildKiller {
652        ChildKiller {
653            child: Arc::clone(&self.child),
654        }
655    }
656
657    /// Drain any remaining lines from the stream, wait for the child
658    /// to exit, and return `(status, stderr)`.
659    ///
660    /// Does not call [`Error::ExitedNonZero`] itself — the caller
661    /// decides whether a non-zero exit is actionable. Unit 11's
662    /// executor maps this to [`Error::ExitedNonZero`] when the node
663    /// contract considers a non-zero status a failure.
664    pub fn finish(mut self) -> Result<(ExitStatus, String), Error> {
665        // Drain pending iterator output so capture contains everything
666        // the child produced before we close its stdout.
667        for item in self.parser.by_ref() {
668            item?;
669        }
670        self.parser.flush_capture()?;
671
672        let child_opt = {
673            let mut guard = match self.child.lock() {
674                Ok(g) => g,
675                Err(poisoned) => poisoned.into_inner(),
676            };
677            guard.take()
678        };
679        let mut child = child_opt.ok_or_else(|| Error::Io {
680            source: std::io::Error::new(
681                std::io::ErrorKind::BrokenPipe,
682                "ClaudeProcess::finish called with no live child — \
683                 finish already called or kill-drop left the Option None",
684            ),
685        })?;
686        let status = child.wait().map_err(|source| Error::Io { source })?;
687        let stderr_bytes = self
688            .stderr_thread
689            .take()
690            .map(|h| h.join().unwrap_or_default())
691            .unwrap_or_default();
692        let stderr = String::from_utf8_lossy(&stderr_bytes).into_owned();
693        Ok((status, stderr))
694    }
695}
696
697impl Iterator for ClaudeProcess {
698    type Item = Result<AssistantLine, Error>;
699
700    fn next(&mut self) -> Option<Self::Item> {
701        self.parser.next()
702    }
703}
704
705#[cfg(test)]
706mod tests {
707    use super::*;
708    use std::io::Cursor;
709    use tempfile::TempDir;
710
711    // ---- argv construction ---------------------------------------------
712
713    fn collect_args(cmd: &Command) -> Vec<String> {
714        cmd.get_args()
715            .map(|a| a.to_string_lossy().into_owned())
716            .collect()
717    }
718
719    #[test]
720    fn build_command_sets_required_streaming_flags() {
721        let opts = SpawnOpts::new("go", "/tmp");
722        let cmd = build_command(&opts);
723        let args = collect_args(&cmd);
724        assert!(args
725            .windows(4)
726            .any(|w| w == ["-p", "--output-format", "stream-json", "--verbose"]));
727        assert_eq!(args.last().unwrap(), "go");
728    }
729
730    #[test]
731    fn build_command_omits_unset_model_and_tools() {
732        let args = collect_args(&build_command(&SpawnOpts::new("x", "/tmp")));
733        assert!(!args.iter().any(|a| a == "--model"));
734        assert!(!args.iter().any(|a| a == "--allowed-tools"));
735        assert!(!args.iter().any(|a| a == "--session-id"));
736        assert!(!args.iter().any(|a| a == "--resume"));
737    }
738
739    #[test]
740    fn build_command_joins_allowed_tools_with_commas() {
741        let mut opts = SpawnOpts::new("x", "/tmp");
742        opts.allowed_tools = vec!["Read".into(), "Bash".into()];
743        let args = collect_args(&build_command(&opts));
744        let tools_idx = args.iter().position(|a| a == "--allowed-tools").unwrap();
745        assert_eq!(args[tools_idx + 1], "Read,Bash");
746    }
747
748    #[test]
749    fn build_command_uses_session_id_for_new_session() {
750        let mut opts = SpawnOpts::new("x", "/tmp");
751        opts.session = Some(Session::New("abc-123".into()));
752        let args = collect_args(&build_command(&opts));
753        let idx = args.iter().position(|a| a == "--session-id").unwrap();
754        assert_eq!(args[idx + 1], "abc-123");
755        assert!(!args.iter().any(|a| a == "--resume"));
756    }
757
758    #[test]
759    fn build_command_uses_resume_for_continued_session() {
760        let mut opts = SpawnOpts::new("x", "/tmp");
761        opts.session = Some(Session::Resume("abc-123".into()));
762        let args = collect_args(&build_command(&opts));
763        let idx = args.iter().position(|a| a == "--resume").unwrap();
764        assert_eq!(args[idx + 1], "abc-123");
765        assert!(!args.iter().any(|a| a == "--session-id"));
766    }
767
768    #[test]
769    fn build_command_puts_prompt_last_even_with_extra_args() {
770        let mut opts = SpawnOpts::new("go do it", "/tmp");
771        opts.extra_args = vec!["--dangerously-skip-permissions".into()];
772        let args = collect_args(&build_command(&opts));
773        assert_eq!(args.last().unwrap(), "go do it");
774        let danger_idx = args
775            .iter()
776            .position(|a| a == "--dangerously-skip-permissions")
777            .unwrap();
778        let prompt_idx = args.iter().position(|a| a == "go do it").unwrap();
779        assert!(danger_idx < prompt_idx, "extra args precede the prompt");
780    }
781
782    // ---- parser fixtures -----------------------------------------------
783
784    fn asst(id: &str, text: &str) -> String {
785        serde_json::json!({
786            "type": "assistant",
787            "message": {
788                "id": id,
789                "content": [ { "type": "text", "text": text } ]
790            }
791        })
792        .to_string()
793    }
794
795    fn asst_multi(id: &str, texts: &[&str]) -> String {
796        let content: Vec<_> = texts
797            .iter()
798            .map(|t| serde_json::json!({ "type": "text", "text": t }))
799            .collect();
800        serde_json::json!({
801            "type": "assistant",
802            "message": { "id": id, "content": content }
803        })
804        .to_string()
805    }
806
807    fn run_parser(lines: &[String], capture: &Path) -> Vec<AssistantLine> {
808        let body = lines.join("\n") + "\n";
809        let parser = StreamParser::new(Cursor::new(body.into_bytes()), capture).expect("open");
810        parser.map(|r| r.expect("parse ok")).collect()
811    }
812
813    #[test]
814    fn parses_three_messages_totaling_five_lines() {
815        let tmp = TempDir::new().unwrap();
816        let cap = tmp.path().join("node.out");
817        let lines = vec![
818            asst("m1", "first\nsecond"),
819            asst("m2", "third"),
820            asst("m3", "fourth\nfifth"),
821        ];
822        let yielded = run_parser(&lines, &cap);
823        assert_eq!(
824            yielded.iter().map(|l| &l.text).collect::<Vec<_>>(),
825            vec!["first", "second", "third", "fourth", "fifth"]
826        );
827        assert_eq!(yielded[0].message_id, "m1");
828        assert_eq!(yielded[4].message_id, "m3");
829    }
830
831    #[test]
832    fn joins_multiple_text_blocks_within_one_message() {
833        let tmp = TempDir::new().unwrap();
834        let cap = tmp.path().join("node.out");
835        // Four "deltas" as four text blocks inside the same message;
836        // the final text crosses an internal line boundary after join.
837        let line = asst_multi(
838            "m1",
839            &["first part ", "of line\nsecond ", "line split ", "too"],
840        );
841        let yielded = run_parser(&[line], &cap);
842        assert_eq!(
843            yielded.iter().map(|l| &l.text).collect::<Vec<_>>(),
844            vec!["first part of line", "second line split too"]
845        );
846    }
847
848    #[test]
849    fn joins_partial_message_across_envelopes_sharing_id() {
850        // Same `message.id` across two envelopes — parser must
851        // accumulate, not yield twice.
852        let tmp = TempDir::new().unwrap();
853        let cap = tmp.path().join("node.out");
854        let lines = vec![asst("msame", "hello "), asst("msame", "world\ngoodbye")];
855        let yielded = run_parser(&lines, &cap);
856        assert_eq!(
857            yielded.iter().map(|l| &l.text).collect::<Vec<_>>(),
858            vec!["hello world", "goodbye"]
859        );
860    }
861
862    #[test]
863    fn skips_malformed_stream_json_line_and_continues() {
864        let tmp = TempDir::new().unwrap();
865        let cap = tmp.path().join("node.out");
866        let lines = vec![
867            asst("m1", "ok"),
868            "{not valid json".to_string(),
869            asst("m2", "still ok"),
870        ];
871        let yielded = run_parser(&lines, &cap);
872        assert_eq!(
873            yielded.iter().map(|l| &l.text).collect::<Vec<_>>(),
874            vec!["ok", "still ok"]
875        );
876    }
877
878    #[test]
879    fn skips_unknown_top_level_type() {
880        let tmp = TempDir::new().unwrap();
881        let cap = tmp.path().join("node.out");
882        let lines = vec![
883            serde_json::json!({ "type": "thinking", "content": "pondering" }).to_string(),
884            asst("m1", "visible"),
885        ];
886        let yielded = run_parser(&lines, &cap);
887        assert_eq!(
888            yielded.iter().map(|l| &l.text).collect::<Vec<_>>(),
889            vec!["visible"]
890        );
891    }
892
893    #[test]
894    fn skips_system_hook_response_envelope() {
895        // Spike-confirmed noise: SessionStart hooks fire under `-p`.
896        let tmp = TempDir::new().unwrap();
897        let cap = tmp.path().join("node.out");
898        let lines = vec![
899            serde_json::json!({
900                "type": "system",
901                "subtype": "hook_response",
902                "hook_name": "SessionStart",
903                "output": "CAVEMAN MODE ACTIVE"
904            })
905            .to_string(),
906            asst("m1", "real content"),
907        ];
908        let yielded = run_parser(&lines, &cap);
909        assert_eq!(
910            yielded.iter().map(|l| &l.text).collect::<Vec<_>>(),
911            vec!["real content"]
912        );
913    }
914
915    #[test]
916    fn skips_non_text_blocks_inside_assistant_message() {
917        let tmp = TempDir::new().unwrap();
918        let cap = tmp.path().join("node.out");
919        // tool_use and thinking blocks must not leak into scanner
920        // lines — only text blocks contribute.
921        let line = serde_json::json!({
922            "type": "assistant",
923            "message": {
924                "id": "m1",
925                "content": [
926                    { "type": "thinking", "thinking": "...internal..." },
927                    { "type": "text", "text": "visible line" },
928                    { "type": "tool_use", "name": "Read", "input": {} }
929                ]
930            }
931        })
932        .to_string();
933        let yielded = run_parser(&[line], &cap);
934        assert_eq!(
935            yielded.iter().map(|l| &l.text).collect::<Vec<_>>(),
936            vec!["visible line"]
937        );
938    }
939
940    #[test]
941    fn trims_whitespace_around_reconstructed_lines() {
942        let tmp = TempDir::new().unwrap();
943        let cap = tmp.path().join("node.out");
944        let line = asst("m1", "   BLOCKED   \n  trailing\n");
945        let yielded = run_parser(&[line], &cap);
946        assert_eq!(
947            yielded.iter().map(|l| &l.text).collect::<Vec<_>>(),
948            vec!["BLOCKED", "trailing"]
949        );
950    }
951
952    #[test]
953    fn capture_file_mirrors_concatenated_assistant_text() {
954        let tmp = TempDir::new().unwrap();
955        let cap = tmp.path().join("deep").join("node.out");
956        let lines = vec![
957            asst("m1", "alpha\n"),
958            asst_multi("m2", ["beta ", "gamma"].as_ref()),
959        ];
960        let _ = run_parser(&lines, &cap);
961        let mut written = String::new();
962        File::open(&cap)
963            .unwrap()
964            .read_to_string(&mut written)
965            .unwrap();
966        assert_eq!(written, "alpha\nbeta gamma");
967    }
968
969    #[test]
970    fn empty_stream_yields_no_lines_and_creates_empty_capture() {
971        let tmp = TempDir::new().unwrap();
972        let cap = tmp.path().join("node.out");
973        let parser = StreamParser::new(Cursor::new(Vec::<u8>::new()), &cap).unwrap();
974        assert_eq!(parser.count(), 0);
975        assert!(cap.exists());
976        assert_eq!(std::fs::metadata(&cap).unwrap().len(), 0);
977    }
978
979    #[test]
980    fn preflight_missing_binary_is_host_missing() {
981        // Non-existent explicit path: NotFound → HostMissing.
982        let err = preflight(Some(Path::new("/definitely/not/a/binary/omne-xyz"))).unwrap_err();
983        assert!(
984            matches!(err, Error::HostMissing),
985            "expected HostMissing, got {err:?}"
986        );
987    }
988}