Skip to main content

omne_cli/
claude_proc.rs

1//! `claude -p --output-format stream-json` subprocess client.
2//!
3//! Unit 9 substrate: drive a `claude` subprocess, parse its stream-json
4//! output, reconstruct assistant text into lines, and capture the
5//! concatenated text to `.omne/var/runs/<run_id>/nodes/<node_id>.out`.
6//! The sentinel scanner (Unit 10) and node executor (Unit 11) consume
7//! the reconstructed-line iterator exposed here.
8//!
9//! The module is intentionally split into two layers:
10//!
11//! - [`StreamParser`] — a generic `BufRead` → `AssistantLine` iterator.
12//!   Unit tests and downstream reuse (future replay/inspection tooling)
13//!   drive this directly with canned fixtures; no subprocess required.
14//! - [`ClaudeProcess`] — the subprocess wrapper that owns a `Child`,
15//!   drains its stderr on a background thread (so the pipe buffer can
16//!   never deadlock a long-running `claude -p` on a quiet run), and
17//!   delegates iteration to a `StreamParser<BufReader<ChildStdout>>`.
18//!
19//! Reconstruction contract (plan R8, spike 2026-04-15):
20//!
21//! - Each stream-json line is one envelope. We only care about
22//!   `{"type":"assistant", "message": { "id": ..., "content": [...] }}`
23//!   entries. All other top-level types are skipped — including
24//!   `system.hook_response`, which SessionStart hooks emit under `-p`
25//!   (spike-confirmed noise) and which must never leak into the
26//!   sentinel scanner.
27//! - `claude -p` may split a single logical message across multiple
28//!   envelopes that share `message.id`. Text blocks are therefore
29//!   accumulated into a per-`id` buffer and flushed when a new id is
30//!   observed or the stream ends. Flushing splits the buffer on `\n`,
31//!   trims each line, drops empties, and yields them in order.
32//! - All received text blocks are written verbatim to the capture file
33//!   as they arrive. No per-line rewriting: the capture is the raw
34//!   concatenation of every `text` block from every assistant message,
35//!   which matches the plan's "concatenated assistant text" contract.
36
37#![allow(dead_code)]
38
39use std::collections::{HashMap, VecDeque};
40use std::ffi::OsStr;
41use std::fs::{File, OpenOptions};
42use std::io::{BufRead, BufReader, Read, Write};
43use std::path::{Path, PathBuf};
44use std::process::{Child, ChildStdout, Command, ExitStatus, Stdio};
45use std::sync::{Arc, Mutex};
46use std::thread::{self, JoinHandle};
47use std::time::Duration;
48
49use thiserror::Error;
50use wait_timeout::ChildExt;
51
52/// Default binary name probed on `PATH`.
53pub const DEFAULT_BIN: &str = "claude";
54
55/// Wall-clock budget for [`preflight`] (i.e. `claude --version`). Long
56/// enough for a slow Windows process spawn; short enough to fail fast
57/// when the host binary is wedged. Runtime invocations (`stream`) have
58/// no timeout at this layer — Unit 11's executor owns node-level
59/// deadlines.
60pub const PREFLIGHT_TIMEOUT: Duration = Duration::from_secs(10);
61
62/// Errors surfaced by the `claude -p` subprocess client.
63#[derive(Debug, Error)]
64pub enum Error {
65    /// `claude` is not on `PATH`, or the explicit override path does
66    /// not point at an executable. Distinct from [`Error::Spawn`] so
67    /// `omne validate` and the executor can emit an install hint.
68    #[error(
69        "claude binary not found on PATH\n\
70         hint: install Claude Code and ensure `claude` is on PATH"
71    )]
72    HostMissing,
73
74    /// `std::process::Command::spawn` failed for a reason other than
75    /// the binary being missing (permission denied, I/O error on the
76    /// pipe allocation, etc).
77    #[error("failed to launch claude: {source}")]
78    Spawn {
79        #[source]
80        source: std::io::Error,
81    },
82
83    /// The preflight subprocess did not exit within
84    /// [`PREFLIGHT_TIMEOUT`]. The child is killed before this error is
85    /// returned so no zombie leaks.
86    #[error("claude did not exit within {elapsed:?}")]
87    Timeout { elapsed: Duration },
88
89    /// The child exited with a non-zero status. The stderr tail is
90    /// carried verbatim so callers can surface it in their own error
91    /// reporting.
92    #[error("claude exited with {status}\nstderr: {stderr}")]
93    ExitedNonZero { status: ExitStatus, stderr: String },
94
95    /// I/O error reading the child's stdout or awaiting its exit.
96    #[error("I/O error on claude stream: {source}")]
97    Io {
98        #[source]
99        source: std::io::Error,
100    },
101
102    /// I/O error writing the per-node capture file.
103    #[error("capture I/O on {path}: {source}")]
104    Capture {
105        path: PathBuf,
106        #[source]
107        source: std::io::Error,
108    },
109}
110
111/// Session lifecycle for one `claude -p` invocation.
112///
113/// Locked by Unit 0 spike: `--session-id` on the first iteration of a
114/// `fresh_context: false` loop, `--resume` on every subsequent
115/// iteration. The id is a UUID (not a ULID) — `claude --session-id`
116/// rejects non-UUID strings. Unit 11's loop controller allocates and
117/// passes this value.
118#[derive(Debug, Clone, Eq, PartialEq)]
119pub enum Session {
120    /// First invocation: set deterministic session id via
121    /// `--session-id <uuid>`.
122    New(String),
123    /// Subsequent invocations: resume via `--resume <uuid>`.
124    Resume(String),
125}
126
127/// Arguments used to build a `claude -p` invocation.
128#[derive(Debug, Clone)]
129pub struct SpawnOpts {
130    /// Positional prompt argument passed last to `claude -p`.
131    pub prompt: String,
132    /// Working directory for the child process. Unit 11 sets this to
133    /// the per-run worktree.
134    pub cwd: PathBuf,
135    /// Optional `--model <id>`. Omitted when `None`.
136    pub model: Option<String>,
137    /// Comma-joined into `--allowed-tools`. Empty when the node places
138    /// no restriction.
139    pub allowed_tools: Vec<String>,
140    /// Loop session flag (Unit 11); `None` for one-shot nodes.
141    pub session: Option<Session>,
142    /// Escape hatch for per-distro flags (e.g. `--bare`,
143    /// `--dangerously-skip-permissions`). Appended verbatim before the
144    /// prompt.
145    pub extra_args: Vec<String>,
146    /// Environment variables exported to the spawned `claude` process.
147    /// Executor populates `OMNE_RUN_ID`, `OMNE_NODE_ID`,
148    /// `OMNE_VOLUME_ROOT`, and one `OMNE_INPUT_<KEY>` per `--input`;
149    /// bash nodes and gate hooks already get the same set via their
150    /// own paths.
151    pub env_vars: Vec<(String, String)>,
152    /// Override binary path. Defaults to [`DEFAULT_BIN`] on `PATH`.
153    pub bin: Option<PathBuf>,
154}
155
156impl SpawnOpts {
157    /// Minimal constructor for the common case: prompt + cwd, no
158    /// model / tools / session.
159    pub fn new(prompt: impl Into<String>, cwd: impl Into<PathBuf>) -> Self {
160        Self {
161            prompt: prompt.into(),
162            cwd: cwd.into(),
163            model: None,
164            allowed_tools: Vec::new(),
165            session: None,
166            extra_args: Vec::new(),
167            env_vars: Vec::new(),
168            bin: None,
169        }
170    }
171}
172
173/// How [`StreamParser`] opens the capture file.
174///
175/// Non-loop and first-iteration loop nodes want `Truncate` so a replayed
176/// run does not concatenate stale text from a prior attempt. Subsequent
177/// loop iterations want `Append` so all iterations accumulate in the
178/// same `nodes/<id>.out` (with iteration markers written separately by
179/// the executor). Default: `Truncate`.
180#[derive(Debug, Clone, Copy, Eq, PartialEq, Default)]
181pub enum CaptureMode {
182    #[default]
183    Truncate,
184    Append,
185}
186
187/// One reconstructed line of assistant text.
188///
189/// `message_id` is the `message.id` field carried by every `assistant`
190/// stream-json envelope. Unit 10's sentinel scanner does not need it
191/// today (matches are on `text` alone), but Unit 11's telemetry and
192/// future replay tooling benefit from knowing which turn produced each
193/// line, so it is surfaced rather than hidden.
194#[derive(Debug, Clone, Eq, PartialEq)]
195pub struct AssistantLine {
196    pub text: String,
197    pub message_id: String,
198}
199
200/// Run `claude --version` and return the reported version string.
201///
202/// `bin = None` uses the `PATH`-resolved `claude` binary. An explicit
203/// path is useful for tests and for environments where multiple
204/// installations coexist.
205pub fn preflight(bin: Option<&Path>) -> Result<String, Error> {
206    let bin_os: &OsStr = bin
207        .map(|b| b.as_os_str())
208        .unwrap_or_else(|| OsStr::new(DEFAULT_BIN));
209
210    // Pre-probe for HostMissing so the error surfaces cleanly instead
211    // of going through the ambiguous `Command::spawn` error path.
212    if bin.is_none() && which::which(DEFAULT_BIN).is_err() {
213        return Err(Error::HostMissing);
214    }
215
216    let mut child = Command::new(bin_os)
217        .arg("--version")
218        .stdin(Stdio::null())
219        .stdout(Stdio::piped())
220        .stderr(Stdio::piped())
221        .spawn()
222        .map_err(map_spawn_error)?;
223
224    let status = match child
225        .wait_timeout(PREFLIGHT_TIMEOUT)
226        .map_err(|source| Error::Io { source })?
227    {
228        Some(status) => status,
229        None => {
230            let _ = child.kill();
231            let _ = child.wait();
232            return Err(Error::Timeout {
233                elapsed: PREFLIGHT_TIMEOUT,
234            });
235        }
236    };
237
238    let mut stdout = String::new();
239    if let Some(mut s) = child.stdout.take() {
240        s.read_to_string(&mut stdout)
241            .map_err(|source| Error::Io { source })?;
242    }
243    let mut stderr = String::new();
244    if let Some(mut s) = child.stderr.take() {
245        s.read_to_string(&mut stderr)
246            .map_err(|source| Error::Io { source })?;
247    }
248
249    if !status.success() {
250        return Err(Error::ExitedNonZero { status, stderr });
251    }
252    Ok(stdout.trim().to_string())
253}
254
255/// Build the `Command` that [`spawn`] would launch. Exposed so tests
256/// and Unit 11's executor can inspect / adjust the argv without
257/// actually spawning.
258pub fn build_command(opts: &SpawnOpts) -> Command {
259    let bin_os: &OsStr = opts
260        .bin
261        .as_deref()
262        .map(|p| p.as_os_str())
263        .unwrap_or_else(|| OsStr::new(DEFAULT_BIN));
264
265    let mut cmd = Command::new(bin_os);
266    cmd.current_dir(&opts.cwd);
267    for (k, v) in &opts.env_vars {
268        cmd.env(k, v);
269    }
270    cmd.args(["-p", "--output-format", "stream-json", "--verbose"]);
271
272    if let Some(model) = &opts.model {
273        cmd.args(["--model", model]);
274    }
275    if !opts.allowed_tools.is_empty() {
276        cmd.arg("--allowed-tools").arg(opts.allowed_tools.join(","));
277    }
278    match &opts.session {
279        Some(Session::New(uuid)) => {
280            cmd.args(["--session-id", uuid]);
281        }
282        Some(Session::Resume(uuid)) => {
283            cmd.args(["--resume", uuid]);
284        }
285        None => {}
286    }
287    for extra in &opts.extra_args {
288        cmd.arg(extra);
289    }
290
291    // Positional prompt goes last so it is unambiguous even if an
292    // `extra_args` entry starts with `-`.
293    cmd.arg(&opts.prompt);
294
295    cmd.stdin(Stdio::null());
296    cmd.stdout(Stdio::piped());
297    cmd.stderr(Stdio::piped());
298    cmd
299}
300
301/// Spawn `claude -p` per [`SpawnOpts`] and return the `Child`.
302///
303/// Stdin is redirected from `/dev/null` so the subprocess never waits
304/// for interactive input; stdout and stderr are piped so the caller
305/// can feed them to [`ClaudeProcess`].
306pub fn spawn(opts: &SpawnOpts) -> Result<Child, Error> {
307    build_command(opts).spawn().map_err(map_spawn_error)
308}
309
310/// Wire a live `Child` into a [`ClaudeProcess`] that yields
311/// [`AssistantLine`] values and writes the reconstructed text to
312/// `capture_path` (truncating any prior content).
313pub fn stream(child: Child, capture_path: &Path) -> Result<ClaudeProcess, Error> {
314    ClaudeProcess::from_child(child, capture_path)
315}
316
317/// Translate a `Command::spawn` failure into [`Error::HostMissing`]
318/// when the OS reports `NotFound`, otherwise [`Error::Spawn`]. Shared
319/// between [`preflight`] and [`spawn`].
320fn map_spawn_error(source: std::io::Error) -> Error {
321    if source.kind() == std::io::ErrorKind::NotFound {
322        Error::HostMissing
323    } else {
324        Error::Spawn { source }
325    }
326}
327
328/// Parses stream-json envelopes from any `BufRead` and yields
329/// [`AssistantLine`] values, while capturing all reconstructed text
330/// to a backing file.
331///
332/// The parser owns the capture file so iteration and capture advance
333/// in lockstep; callers who just want the lines can drop the iterator
334/// and the capture file closes on its own. Flushing happens on every
335/// message transition and on EOF.
336pub struct StreamParser<R: BufRead> {
337    reader: R,
338    capture: File,
339    capture_path: PathBuf,
340    /// Per-message-id text buffer. Keyed by `message.id` because
341    /// `claude -p` may emit multiple envelopes with the same id when
342    /// partial-message streaming is active.
343    buffers: HashMap<String, String>,
344    /// Insertion order of ids observed, so flush-on-EOF yields lines
345    /// in deterministic order across messages. We also push to this
346    /// on new-id observation, and pop the id when it is flushed.
347    order: VecDeque<String>,
348    /// Id of the most recently seen `assistant` envelope. Used to
349    /// detect id transitions (flush prior) without scanning `order`.
350    current_id: Option<String>,
351    /// Lines ready to be yielded. Populated at flush time.
352    pending: VecDeque<AssistantLine>,
353    finished: bool,
354}
355
356impl<R: BufRead> StreamParser<R> {
357    /// Build a parser over `reader`, creating or truncating
358    /// `capture_path` (and its parent directory) for the reconstructed
359    /// text. Equivalent to [`StreamParser::with_mode`] with
360    /// [`CaptureMode::Truncate`].
361    pub fn new(reader: R, capture_path: &Path) -> Result<Self, Error> {
362        Self::with_mode(reader, capture_path, CaptureMode::Truncate)
363    }
364
365    /// Build a parser over `reader` with explicit capture-open mode.
366    ///
367    /// [`CaptureMode::Truncate`] wipes prior content; [`CaptureMode::Append`]
368    /// positions writes at end-of-file so loop iterations accumulate in
369    /// one file (plan R11 "same `nodes/<id>.out` with iteration markers").
370    pub fn with_mode(reader: R, capture_path: &Path, mode: CaptureMode) -> Result<Self, Error> {
371        if let Some(parent) = capture_path.parent() {
372            if !parent.as_os_str().is_empty() {
373                std::fs::create_dir_all(parent).map_err(|source| Error::Capture {
374                    path: parent.to_path_buf(),
375                    source,
376                })?;
377            }
378        }
379        let mut open = OpenOptions::new();
380        open.create(true).write(true);
381        match mode {
382            CaptureMode::Truncate => {
383                open.truncate(true);
384            }
385            CaptureMode::Append => {
386                open.append(true);
387            }
388        }
389        let capture = open.open(capture_path).map_err(|source| Error::Capture {
390            path: capture_path.to_path_buf(),
391            source,
392        })?;
393        Ok(Self {
394            reader,
395            capture,
396            capture_path: capture_path.to_path_buf(),
397            buffers: HashMap::new(),
398            order: VecDeque::new(),
399            current_id: None,
400            pending: VecDeque::new(),
401            finished: false,
402        })
403    }
404
405    /// Ingest one raw line. Parses as JSON; dispatches on top-level
406    /// `type`. Returns `Ok(())` for skippable envelopes and for
407    /// malformed lines (logged + dropped per plan R-deferred
408    /// "Streamed JSON parsing resilience").
409    fn ingest_line(&mut self, raw: &str) -> Result<(), Error> {
410        let trimmed = raw.trim();
411        if trimmed.is_empty() {
412            return Ok(());
413        }
414        let env: serde_json::Value = match serde_json::from_str(trimmed) {
415            Ok(v) => v,
416            Err(err) => {
417                eprintln!("claude_proc: skipping malformed stream-json line: {err}");
418                return Ok(());
419            }
420        };
421        let ty = env.get("type").and_then(|t| t.as_str()).unwrap_or("");
422        if ty != "assistant" {
423            // Silent skip for the envelopes the plan explicitly
424            // excludes (system/user/result/rate_limit_event) and for
425            // unknown future types. `system.hook_response` lands here
426            // and is the load-bearing spike-confirmed noise source.
427            return Ok(());
428        }
429
430        let Some(message) = env.get("message") else {
431            return Ok(());
432        };
433        let id = message
434            .get("id")
435            .and_then(|v| v.as_str())
436            .unwrap_or("")
437            .to_string();
438
439        // New id ≠ currently-open id → flush the prior message first
440        // so the yield order is <prior lines>, then <this message's
441        // lines> (on subsequent ingest or EOF).
442        if let Some(prev) = self.current_id.as_ref() {
443            if prev != &id {
444                let prev = prev.clone();
445                self.flush_message(&prev);
446            }
447        }
448        if !self.buffers.contains_key(&id) {
449            self.order.push_back(id.clone());
450        }
451        self.current_id = Some(id.clone());
452
453        let empty = Vec::new();
454        let content = message
455            .get("content")
456            .and_then(|c| c.as_array())
457            .unwrap_or(&empty);
458        let mut appended = String::new();
459        for block in content {
460            let block_type = block.get("type").and_then(|t| t.as_str()).unwrap_or("");
461            if block_type != "text" {
462                continue;
463            }
464            if let Some(text) = block.get("text").and_then(|v| v.as_str()) {
465                appended.push_str(text);
466            }
467        }
468        if !appended.is_empty() {
469            self.buffers.entry(id).or_default().push_str(&appended);
470            self.capture
471                .write_all(appended.as_bytes())
472                .map_err(|source| Error::Capture {
473                    path: self.capture_path.clone(),
474                    source,
475                })?;
476        }
477        Ok(())
478    }
479
480    /// Split `buffers[id]` on `\n`, trim and drop empties, push the
481    /// survivors onto `pending` in order. Also removes `id` from
482    /// `order` so the iterator does not re-flush it.
483    fn flush_message(&mut self, id: &str) {
484        let Some(buf) = self.buffers.remove(id) else {
485            return;
486        };
487        // Keep `order` in sync so EOF-flush does not re-yield.
488        if let Some(pos) = self.order.iter().position(|x| x == id) {
489            self.order.remove(pos);
490        }
491        for piece in buf.split('\n') {
492            let trimmed = piece.trim();
493            if trimmed.is_empty() {
494                continue;
495            }
496            self.pending.push_back(AssistantLine {
497                text: trimmed.to_string(),
498                message_id: id.to_string(),
499            });
500        }
501    }
502
503    /// Flush every outstanding buffer. Called when the stream reaches
504    /// EOF so trailing partial-line text from the last message still
505    /// reaches the consumer.
506    fn flush_all(&mut self) {
507        self.current_id = None;
508        while let Some(id) = self.order.pop_front() {
509            // flush_message removes from `order`, but since we've
510            // already drained it here, re-inserting the id as the
511            // argument and letting flush_message work on `buffers`
512            // only is fine.
513            let buf = match self.buffers.remove(&id) {
514                Some(b) => b,
515                None => continue,
516            };
517            for piece in buf.split('\n') {
518                let trimmed = piece.trim();
519                if trimmed.is_empty() {
520                    continue;
521                }
522                self.pending.push_back(AssistantLine {
523                    text: trimmed.to_string(),
524                    message_id: id.clone(),
525                });
526            }
527        }
528    }
529
530    /// Flush the capture file to disk. Called by
531    /// [`ClaudeProcess::finish`] before reporting exit status so a
532    /// crashing caller cannot leave buffered capture data unflushed.
533    fn flush_capture(&mut self) -> Result<(), Error> {
534        self.capture.flush().map_err(|source| Error::Capture {
535            path: self.capture_path.clone(),
536            source,
537        })
538    }
539}
540
541impl<R: BufRead> Iterator for StreamParser<R> {
542    type Item = Result<AssistantLine, Error>;
543
544    fn next(&mut self) -> Option<Self::Item> {
545        loop {
546            if let Some(line) = self.pending.pop_front() {
547                return Some(Ok(line));
548            }
549            if self.finished {
550                return None;
551            }
552            let mut raw = String::new();
553            match self.reader.read_line(&mut raw) {
554                Ok(0) => {
555                    self.finished = true;
556                    self.flush_all();
557                    if let Err(e) = self.flush_capture() {
558                        return Some(Err(e));
559                    }
560                }
561                Ok(_) => {
562                    if let Err(e) = self.ingest_line(&raw) {
563                        return Some(Err(e));
564                    }
565                }
566                Err(source) => {
567                    self.finished = true;
568                    return Some(Err(Error::Io { source }));
569                }
570            }
571        }
572    }
573}
574
575/// Subprocess wrapper around a live `claude -p` [`Child`] that yields
576/// [`AssistantLine`] values and, on [`ClaudeProcess::finish`], returns
577/// the child's exit status and captured stderr.
578///
579/// Stderr is drained on a dedicated thread as soon as the process is
580/// wrapped. Without this, a chatty stderr (warnings, deprecation
581/// notices) would fill its pipe buffer and block the child until
582/// somebody reads it — which no one does during a sentinel-driven
583/// loop iteration that only cares about stdout.
584pub struct ClaudeProcess {
585    parser: StreamParser<BufReader<ChildStdout>>,
586    child: Arc<Mutex<Option<Child>>>,
587    stderr_thread: Option<JoinHandle<Vec<u8>>>,
588}
589
590/// Cloneable handle that can kill a live [`ClaudeProcess`].
591///
592/// Returned by [`ClaudeProcess::killer`]. The executor's timeout
593/// watchdog thread holds one while the main thread drains the parser;
594/// if the deadline elapses before the stream ends, the watchdog calls
595/// [`ChildKiller::kill`] and the main thread's next `read_line` returns
596/// `Ok(0)` once the child's stdout pipe closes.
597#[derive(Clone)]
598pub struct ChildKiller {
599    child: Arc<Mutex<Option<Child>>>,
600}
601
602impl ChildKiller {
603    /// Send SIGKILL (or its platform equivalent) to the child if it is
604    /// still alive. A no-op once [`ClaudeProcess::finish`] has taken
605    /// the child out.
606    pub fn kill(&self) -> std::io::Result<()> {
607        let mut guard = match self.child.lock() {
608            Ok(g) => g,
609            Err(poisoned) => poisoned.into_inner(),
610        };
611        if let Some(c) = guard.as_mut() {
612            return c.kill();
613        }
614        Ok(())
615    }
616}
617
618impl ClaudeProcess {
619    /// Wrap `child` and start draining its stderr.
620    ///
621    /// The child's `stdout` and `stderr` handles must both be piped
622    /// (i.e. the child was spawned via [`spawn`] or via
623    /// [`build_command`]). A missing stdout is fatal because the
624    /// parser has nothing to read. Equivalent to
625    /// [`ClaudeProcess::from_child_with_mode`] with
626    /// [`CaptureMode::Truncate`].
627    pub fn from_child(child: Child, capture_path: &Path) -> Result<Self, Error> {
628        Self::from_child_with_mode(child, capture_path, CaptureMode::Truncate)
629    }
630
631    /// Same as [`ClaudeProcess::from_child`] but with explicit capture
632    /// open mode. Loop iterations ≥ 2 pass [`CaptureMode::Append`] so
633    /// prior iterations' text is preserved.
634    pub fn from_child_with_mode(
635        mut child: Child,
636        capture_path: &Path,
637        mode: CaptureMode,
638    ) -> Result<Self, Error> {
639        let stdout = child.stdout.take().ok_or_else(|| Error::Io {
640            source: std::io::Error::new(
641                std::io::ErrorKind::BrokenPipe,
642                "claude child spawned without piped stdout",
643            ),
644        })?;
645        let stderr_thread = child.stderr.take().map(|mut s| {
646            thread::spawn(move || {
647                let mut buf = Vec::new();
648                let _ = s.read_to_end(&mut buf);
649                buf
650            })
651        });
652        let parser = StreamParser::with_mode(BufReader::new(stdout), capture_path, mode)?;
653        Ok(Self {
654            parser,
655            child: Arc::new(Mutex::new(Some(child))),
656            stderr_thread,
657        })
658    }
659
660    /// Clone a kill-handle for a timeout watchdog. See [`ChildKiller`].
661    pub fn killer(&self) -> ChildKiller {
662        ChildKiller {
663            child: Arc::clone(&self.child),
664        }
665    }
666
667    /// Drain any remaining lines from the stream, wait for the child
668    /// to exit, and return `(status, stderr)`.
669    ///
670    /// Does not call [`Error::ExitedNonZero`] itself — the caller
671    /// decides whether a non-zero exit is actionable. Unit 11's
672    /// executor maps this to [`Error::ExitedNonZero`] when the node
673    /// contract considers a non-zero status a failure.
674    pub fn finish(mut self) -> Result<(ExitStatus, String), Error> {
675        // Drain pending iterator output so capture contains everything
676        // the child produced before we close its stdout.
677        for item in self.parser.by_ref() {
678            item?;
679        }
680        self.parser.flush_capture()?;
681
682        let child_opt = {
683            let mut guard = match self.child.lock() {
684                Ok(g) => g,
685                Err(poisoned) => poisoned.into_inner(),
686            };
687            guard.take()
688        };
689        let mut child = child_opt.ok_or_else(|| Error::Io {
690            source: std::io::Error::new(
691                std::io::ErrorKind::BrokenPipe,
692                "ClaudeProcess::finish called with no live child — \
693                 finish already called or kill-drop left the Option None",
694            ),
695        })?;
696        let status = child.wait().map_err(|source| Error::Io { source })?;
697        let stderr_bytes = self
698            .stderr_thread
699            .take()
700            .map(|h| h.join().unwrap_or_default())
701            .unwrap_or_default();
702        let stderr = String::from_utf8_lossy(&stderr_bytes).into_owned();
703        Ok((status, stderr))
704    }
705}
706
707impl Iterator for ClaudeProcess {
708    type Item = Result<AssistantLine, Error>;
709
710    fn next(&mut self) -> Option<Self::Item> {
711        self.parser.next()
712    }
713}
714
715#[cfg(test)]
716mod tests {
717    use super::*;
718    use std::io::Cursor;
719    use tempfile::TempDir;
720
721    // ---- argv construction ---------------------------------------------
722
723    fn collect_args(cmd: &Command) -> Vec<String> {
724        cmd.get_args()
725            .map(|a| a.to_string_lossy().into_owned())
726            .collect()
727    }
728
729    #[test]
730    fn build_command_sets_required_streaming_flags() {
731        let opts = SpawnOpts::new("go", "/tmp");
732        let cmd = build_command(&opts);
733        let args = collect_args(&cmd);
734        assert!(args
735            .windows(4)
736            .any(|w| w == ["-p", "--output-format", "stream-json", "--verbose"]));
737        assert_eq!(args.last().unwrap(), "go");
738    }
739
740    #[test]
741    fn build_command_omits_unset_model_and_tools() {
742        let args = collect_args(&build_command(&SpawnOpts::new("x", "/tmp")));
743        assert!(!args.iter().any(|a| a == "--model"));
744        assert!(!args.iter().any(|a| a == "--allowed-tools"));
745        assert!(!args.iter().any(|a| a == "--session-id"));
746        assert!(!args.iter().any(|a| a == "--resume"));
747    }
748
749    #[test]
750    fn build_command_forwards_env_vars() {
751        let mut opts = SpawnOpts::new("x", "/tmp");
752        opts.env_vars = vec![
753            ("OMNE_RUN_ID".into(), "feature-abc".into()),
754            ("OMNE_INPUT_FEATURE_NAME".into(), "hello".into()),
755        ];
756        let cmd = build_command(&opts);
757        let envs: std::collections::HashMap<_, _> = cmd
758            .get_envs()
759            .filter_map(|(k, v)| v.map(|vv| (k.to_os_string(), vv.to_os_string())))
760            .collect();
761        assert_eq!(
762            envs.get(std::ffi::OsStr::new("OMNE_RUN_ID"))
763                .map(|v| v.to_string_lossy().into_owned()),
764            Some("feature-abc".to_string())
765        );
766        assert_eq!(
767            envs.get(std::ffi::OsStr::new("OMNE_INPUT_FEATURE_NAME"))
768                .map(|v| v.to_string_lossy().into_owned()),
769            Some("hello".to_string())
770        );
771    }
772
773    #[test]
774    fn build_command_with_empty_env_vars_adds_none() {
775        // SpawnOpts::new leaves env_vars empty — the command must not
776        // carry any OMNE_* overrides in that case.
777        let cmd = build_command(&SpawnOpts::new("x", "/tmp"));
778        let has_omne = cmd
779            .get_envs()
780            .any(|(k, _)| k.to_string_lossy().starts_with("OMNE_"));
781        assert!(!has_omne, "unexpected OMNE_* env on bare SpawnOpts");
782    }
783
784    #[test]
785    fn build_command_joins_allowed_tools_with_commas() {
786        let mut opts = SpawnOpts::new("x", "/tmp");
787        opts.allowed_tools = vec!["Read".into(), "Bash".into()];
788        let args = collect_args(&build_command(&opts));
789        let tools_idx = args.iter().position(|a| a == "--allowed-tools").unwrap();
790        assert_eq!(args[tools_idx + 1], "Read,Bash");
791    }
792
793    #[test]
794    fn build_command_uses_session_id_for_new_session() {
795        let mut opts = SpawnOpts::new("x", "/tmp");
796        opts.session = Some(Session::New("abc-123".into()));
797        let args = collect_args(&build_command(&opts));
798        let idx = args.iter().position(|a| a == "--session-id").unwrap();
799        assert_eq!(args[idx + 1], "abc-123");
800        assert!(!args.iter().any(|a| a == "--resume"));
801    }
802
803    #[test]
804    fn build_command_uses_resume_for_continued_session() {
805        let mut opts = SpawnOpts::new("x", "/tmp");
806        opts.session = Some(Session::Resume("abc-123".into()));
807        let args = collect_args(&build_command(&opts));
808        let idx = args.iter().position(|a| a == "--resume").unwrap();
809        assert_eq!(args[idx + 1], "abc-123");
810        assert!(!args.iter().any(|a| a == "--session-id"));
811    }
812
813    #[test]
814    fn build_command_puts_prompt_last_even_with_extra_args() {
815        let mut opts = SpawnOpts::new("go do it", "/tmp");
816        opts.extra_args = vec!["--dangerously-skip-permissions".into()];
817        let args = collect_args(&build_command(&opts));
818        assert_eq!(args.last().unwrap(), "go do it");
819        let danger_idx = args
820            .iter()
821            .position(|a| a == "--dangerously-skip-permissions")
822            .unwrap();
823        let prompt_idx = args.iter().position(|a| a == "go do it").unwrap();
824        assert!(danger_idx < prompt_idx, "extra args precede the prompt");
825    }
826
827    // ---- parser fixtures -----------------------------------------------
828
829    fn asst(id: &str, text: &str) -> String {
830        serde_json::json!({
831            "type": "assistant",
832            "message": {
833                "id": id,
834                "content": [ { "type": "text", "text": text } ]
835            }
836        })
837        .to_string()
838    }
839
840    fn asst_multi(id: &str, texts: &[&str]) -> String {
841        let content: Vec<_> = texts
842            .iter()
843            .map(|t| serde_json::json!({ "type": "text", "text": t }))
844            .collect();
845        serde_json::json!({
846            "type": "assistant",
847            "message": { "id": id, "content": content }
848        })
849        .to_string()
850    }
851
852    fn run_parser(lines: &[String], capture: &Path) -> Vec<AssistantLine> {
853        let body = lines.join("\n") + "\n";
854        let parser = StreamParser::new(Cursor::new(body.into_bytes()), capture).expect("open");
855        parser.map(|r| r.expect("parse ok")).collect()
856    }
857
858    #[test]
859    fn parses_three_messages_totaling_five_lines() {
860        let tmp = TempDir::new().unwrap();
861        let cap = tmp.path().join("node.out");
862        let lines = vec![
863            asst("m1", "first\nsecond"),
864            asst("m2", "third"),
865            asst("m3", "fourth\nfifth"),
866        ];
867        let yielded = run_parser(&lines, &cap);
868        assert_eq!(
869            yielded.iter().map(|l| &l.text).collect::<Vec<_>>(),
870            vec!["first", "second", "third", "fourth", "fifth"]
871        );
872        assert_eq!(yielded[0].message_id, "m1");
873        assert_eq!(yielded[4].message_id, "m3");
874    }
875
876    #[test]
877    fn joins_multiple_text_blocks_within_one_message() {
878        let tmp = TempDir::new().unwrap();
879        let cap = tmp.path().join("node.out");
880        // Four "deltas" as four text blocks inside the same message;
881        // the final text crosses an internal line boundary after join.
882        let line = asst_multi(
883            "m1",
884            &["first part ", "of line\nsecond ", "line split ", "too"],
885        );
886        let yielded = run_parser(&[line], &cap);
887        assert_eq!(
888            yielded.iter().map(|l| &l.text).collect::<Vec<_>>(),
889            vec!["first part of line", "second line split too"]
890        );
891    }
892
893    #[test]
894    fn joins_partial_message_across_envelopes_sharing_id() {
895        // Same `message.id` across two envelopes — parser must
896        // accumulate, not yield twice.
897        let tmp = TempDir::new().unwrap();
898        let cap = tmp.path().join("node.out");
899        let lines = vec![asst("msame", "hello "), asst("msame", "world\ngoodbye")];
900        let yielded = run_parser(&lines, &cap);
901        assert_eq!(
902            yielded.iter().map(|l| &l.text).collect::<Vec<_>>(),
903            vec!["hello world", "goodbye"]
904        );
905    }
906
907    #[test]
908    fn skips_malformed_stream_json_line_and_continues() {
909        let tmp = TempDir::new().unwrap();
910        let cap = tmp.path().join("node.out");
911        let lines = vec![
912            asst("m1", "ok"),
913            "{not valid json".to_string(),
914            asst("m2", "still ok"),
915        ];
916        let yielded = run_parser(&lines, &cap);
917        assert_eq!(
918            yielded.iter().map(|l| &l.text).collect::<Vec<_>>(),
919            vec!["ok", "still ok"]
920        );
921    }
922
923    #[test]
924    fn skips_unknown_top_level_type() {
925        let tmp = TempDir::new().unwrap();
926        let cap = tmp.path().join("node.out");
927        let lines = vec![
928            serde_json::json!({ "type": "thinking", "content": "pondering" }).to_string(),
929            asst("m1", "visible"),
930        ];
931        let yielded = run_parser(&lines, &cap);
932        assert_eq!(
933            yielded.iter().map(|l| &l.text).collect::<Vec<_>>(),
934            vec!["visible"]
935        );
936    }
937
938    #[test]
939    fn skips_system_hook_response_envelope() {
940        // Spike-confirmed noise: SessionStart hooks fire under `-p`.
941        let tmp = TempDir::new().unwrap();
942        let cap = tmp.path().join("node.out");
943        let lines = vec![
944            serde_json::json!({
945                "type": "system",
946                "subtype": "hook_response",
947                "hook_name": "SessionStart",
948                "output": "CAVEMAN MODE ACTIVE"
949            })
950            .to_string(),
951            asst("m1", "real content"),
952        ];
953        let yielded = run_parser(&lines, &cap);
954        assert_eq!(
955            yielded.iter().map(|l| &l.text).collect::<Vec<_>>(),
956            vec!["real content"]
957        );
958    }
959
960    #[test]
961    fn skips_non_text_blocks_inside_assistant_message() {
962        let tmp = TempDir::new().unwrap();
963        let cap = tmp.path().join("node.out");
964        // tool_use and thinking blocks must not leak into scanner
965        // lines — only text blocks contribute.
966        let line = serde_json::json!({
967            "type": "assistant",
968            "message": {
969                "id": "m1",
970                "content": [
971                    { "type": "thinking", "thinking": "...internal..." },
972                    { "type": "text", "text": "visible line" },
973                    { "type": "tool_use", "name": "Read", "input": {} }
974                ]
975            }
976        })
977        .to_string();
978        let yielded = run_parser(&[line], &cap);
979        assert_eq!(
980            yielded.iter().map(|l| &l.text).collect::<Vec<_>>(),
981            vec!["visible line"]
982        );
983    }
984
985    #[test]
986    fn trims_whitespace_around_reconstructed_lines() {
987        let tmp = TempDir::new().unwrap();
988        let cap = tmp.path().join("node.out");
989        let line = asst("m1", "   BLOCKED   \n  trailing\n");
990        let yielded = run_parser(&[line], &cap);
991        assert_eq!(
992            yielded.iter().map(|l| &l.text).collect::<Vec<_>>(),
993            vec!["BLOCKED", "trailing"]
994        );
995    }
996
997    #[test]
998    fn capture_file_mirrors_concatenated_assistant_text() {
999        let tmp = TempDir::new().unwrap();
1000        let cap = tmp.path().join("deep").join("node.out");
1001        let lines = vec![
1002            asst("m1", "alpha\n"),
1003            asst_multi("m2", ["beta ", "gamma"].as_ref()),
1004        ];
1005        let _ = run_parser(&lines, &cap);
1006        let mut written = String::new();
1007        File::open(&cap)
1008            .unwrap()
1009            .read_to_string(&mut written)
1010            .unwrap();
1011        assert_eq!(written, "alpha\nbeta gamma");
1012    }
1013
1014    #[test]
1015    fn empty_stream_yields_no_lines_and_creates_empty_capture() {
1016        let tmp = TempDir::new().unwrap();
1017        let cap = tmp.path().join("node.out");
1018        let parser = StreamParser::new(Cursor::new(Vec::<u8>::new()), &cap).unwrap();
1019        assert_eq!(parser.count(), 0);
1020        assert!(cap.exists());
1021        assert_eq!(std::fs::metadata(&cap).unwrap().len(), 0);
1022    }
1023
1024    #[test]
1025    fn preflight_missing_binary_is_host_missing() {
1026        // Non-existent explicit path: NotFound → HostMissing.
1027        let err = preflight(Some(Path::new("/definitely/not/a/binary/omne-xyz"))).unwrap_err();
1028        assert!(
1029            matches!(err, Error::HostMissing),
1030            "expected HostMissing, got {err:?}"
1031        );
1032    }
1033}