Skip to main content

omne_cli/
executor.rs

1//! Node executor (plan Unit 11).
2//!
3//! `dispatch(node, ctx)` is the single entry point the DAG runner in
4//! Unit 12 calls for each `ready()` node. Responsibilities:
5//!
6//! 1. Emit `node.started` before any subprocess spawn.
7//! 2. Dispatch to the node's kind — bash, AI (command / prompt), or
8//!    loop — and run the subprocess under a wall-clock deadline.
9//! 3. For loops, iterate under a sentinel [`Scanner`] that watches for
10//!    `BLOCKED` (always reserved) plus the user's `until` token;
11//!    session lifecycle follows the Unit 0 spike (`--session-id` first
12//!    iteration, `--resume` thereafter, UUID not ULID).
13//! 4. If the node carries a `gate`, run the platform-current hook
14//!    script under a 60s deadline after a successful node terminates.
15//! 5. Emit exactly one terminal event (`node.completed` or
16//!    `node.failed`) and, when a gate ran, a `gate.passed` event
17//!    between the node's body completing and the terminal event.
18//!
19//! The executor is the sole owner of node-level deadline handling —
20//! `claude_proc::ClaudeProcess` deliberately has no per-call timeout so
21//! that this module can decide, for example, that a loop's wall-clock
22//! budget covers all iterations rather than each one independently.
23
24#![allow(dead_code)]
25
26use std::io::{Read, Write};
27use std::path::{Path, PathBuf};
28use std::process::{Command, Stdio};
29use std::sync::mpsc;
30use std::thread;
31use std::time::{Duration, Instant};
32
33use ulid::Ulid;
34use uuid::Uuid;
35
36use crate::claude_proc::{self, CaptureMode, ChildKiller, ClaudeProcess, Session, SpawnOpts};
37use crate::event_log::EventLog;
38use crate::events::{
39    ErrorKind, Event, GateMethod, GatePassed, Input, IterationStarted, NodeCompleted, NodeError,
40    NodeFailed, NodeKind, NodeStarted,
41};
42use crate::pipe::{ExecutionKind, LoopBody, Node};
43use crate::sentinel::{self, Scanner};
44use crate::volume;
45
46/// Default node wall-clock budget when a pipe node declares no
47/// `timeout:`. Picked to match the memory-locked 1800s default called
48/// out in the plan's "Open questions deferred to implementation" block.
49pub const DEFAULT_NODE_TIMEOUT: Duration = Duration::from_secs(1800);
50
51/// Gate hook deadline, per plan R12.
52pub const GATE_TIMEOUT: Duration = Duration::from_secs(60);
53
54/// Inputs the executor needs that span every dispatch call on one run.
55///
56/// Built once by Unit 12's `omne run` handler and threaded through every
57/// node. Borrowed data everywhere; the executor never needs ownership.
58pub struct ExecutorContext<'a> {
59    /// `.omne/` volume root. Resolves worktree + events paths.
60    pub volume_root: &'a Path,
61    /// `run_id` (e.g. `feature-01arz3ndektsv4rrffq69g5fa0`).
62    pub run_id: &'a str,
63    /// Per-run worktree cwd (`.omne/wt/<run_id>`). All subprocesses
64    /// spawn with this as their cwd.
65    pub worktree: &'a Path,
66    /// Append-only log; executor emits `node.*` + `gate.passed` through
67    /// it.
68    pub event_log: &'a EventLog,
69    /// `--input k=v` pairs. Bash nodes export these as
70    /// `OMNE_INPUT_<KEY>`; AI nodes receive them via the gate hook env
71    /// and the pipe-level prompt template (not interpolated at the
72    /// runner in v1 — the pipe body references them).
73    pub inputs: &'a [Input],
74    /// Pipe-level `default_model:`. AI nodes inherit it unless they
75    /// specify their own `model:`.
76    pub default_model: Option<&'a str>,
77    /// Optional `claude` binary override, mainly a test seam.
78    pub claude_bin: Option<&'a Path>,
79    /// Wall-clock budget when a node does not override `timeout:`.
80    /// Defaults to [`DEFAULT_NODE_TIMEOUT`].
81    pub default_node_timeout: Duration,
82    /// Wall-clock budget for gate hook execution. Defaults to
83    /// [`GATE_TIMEOUT`] (60s per plan R12). Exposed on the context so
84    /// integration tests can drive a short gate-timeout path without
85    /// waiting a full minute.
86    pub gate_timeout: Duration,
87}
88
89impl<'a> ExecutorContext<'a> {
90    /// Convenience constructor that fills in the plan-default wall-clock
91    /// budgets. `worktree` is typically `.omne/wt/<run_id>` — the
92    /// runner computes it once and threads it through.
93    pub fn new(
94        volume_root: &'a Path,
95        run_id: &'a str,
96        worktree: &'a Path,
97        event_log: &'a EventLog,
98        inputs: &'a [Input],
99    ) -> Self {
100        Self {
101            volume_root,
102            run_id,
103            worktree,
104            event_log,
105            inputs,
106            default_model: None,
107            claude_bin: None,
108            default_node_timeout: DEFAULT_NODE_TIMEOUT,
109            gate_timeout: GATE_TIMEOUT,
110        }
111    }
112}
113
114/// Result of dispatching one node.
115///
116/// `Completed` and `Failed` map 1:1 to `node.completed` and
117/// `node.failed` in the event log; the runner threads the variant
118/// through `dag::Scheduler::mark`.
119#[derive(Debug, Clone, Eq, PartialEq)]
120pub enum NodeOutcome {
121    Completed,
122    Failed {
123        kind: ErrorKind,
124        message: Option<String>,
125    },
126}
127
128/// Dispatch one node end-to-end.
129///
130/// Emits `node.started` on entry, runs the node body, runs any attached
131/// gate, and emits exactly one terminal event (`node.completed` or
132/// `node.failed`). The returned [`NodeOutcome`] mirrors the terminal
133/// event so the runner can feed the scheduler without re-reading the
134/// event log.
135pub fn dispatch(node: &Node, ctx: &ExecutorContext<'_>) -> Result<NodeOutcome, DispatchError> {
136    let kind = node
137        .execution_kind()
138        .ok_or_else(|| DispatchError::InvalidNode {
139            node_id: node.id.clone(),
140            reason: "node has no execution kind (validator should have caught this)".into(),
141        })?;
142
143    let event_kind = match kind {
144        ExecutionKind::Command => NodeKind::Command,
145        ExecutionKind::Prompt => NodeKind::Prompt,
146        ExecutionKind::Bash => NodeKind::Bash,
147        ExecutionKind::Loop => NodeKind::Loop,
148    };
149
150    ctx.event_log
151        .append(&Event::NodeStarted(NodeStarted {
152            id: new_event_id(),
153            ts: iso_utc_now(),
154            run_id: ctx.run_id.to_string(),
155            node_id: node.id.clone(),
156            kind: event_kind,
157            name: None,
158            model: effective_model(node, ctx).map(str::to_string),
159        }))
160        .map_err(DispatchError::from)?;
161
162    // Compute the body + gate outcome with infrastructure errors
163    // folded into a `NodeFailed{Crash}` so `node.started` is always
164    // paired with exactly one terminal event. The only residual orphan
165    // window is `emit_terminal` itself failing — at that point the
166    // event log is unwritable and no compensation is possible.
167    let final_outcome = body_with_gate(node, kind, ctx);
168    emit_terminal(node, &final_outcome, ctx)?;
169    Ok(final_outcome)
170}
171
172/// Run the node body (and gate, if any) returning a `NodeOutcome` that
173/// is safe to emit as a terminal event. Dispatch-level infrastructure
174/// errors — failed spawns, event-log writes during the body, capture
175/// I/O — are converted to `NodeFailed{Crash}` with the error message
176/// embedded, so the recorded event stream never contains a dangling
177/// `node.started`.
178fn body_with_gate(node: &Node, kind: ExecutionKind, ctx: &ExecutorContext<'_>) -> NodeOutcome {
179    let body_outcome = match kind {
180        ExecutionKind::Bash => run_bash(node, ctx),
181        ExecutionKind::Command | ExecutionKind::Prompt => run_ai(node, ctx),
182        ExecutionKind::Loop => run_loop(node, ctx),
183    };
184    let body_outcome = match body_outcome {
185        Ok(o) => o,
186        Err(e) => {
187            return NodeOutcome::Failed {
188                kind: ErrorKind::Crash,
189                message: Some(format!("dispatch error in node body: {e}")),
190            }
191        }
192    };
193    match body_outcome {
194        NodeOutcome::Completed => match node.gate.as_deref() {
195            Some(gate) => match run_gate(node, gate, ctx) {
196                Ok(o) => o,
197                Err(e) => NodeOutcome::Failed {
198                    kind: ErrorKind::GateFailed,
199                    message: Some(format!("dispatch error in gate: {e}")),
200                },
201            },
202            None => NodeOutcome::Completed,
203        },
204        other => other,
205    }
206}
207
208// ── Bash path ──────────────────────────────────────────────────────
209
210fn run_bash(node: &Node, ctx: &ExecutorContext<'_>) -> Result<NodeOutcome, DispatchError> {
211    let bash_body = node
212        .bash
213        .as_deref()
214        .ok_or_else(|| DispatchError::InvalidNode {
215            node_id: node.id.clone(),
216            reason: "bash node missing bash: body".into(),
217        })?;
218
219    let capture_path = node_capture_path(ctx, &node.id);
220    ensure_parent_dir(&capture_path)?;
221
222    let mut cmd = bash_command(bash_body);
223    cmd.current_dir(ctx.worktree)
224        .stdin(Stdio::null())
225        .stdout(Stdio::piped())
226        .stderr(Stdio::piped());
227    for input in ctx.inputs {
228        let env_key = format!("OMNE_INPUT_{}", input.key.to_uppercase());
229        cmd.env(env_key, &input.value);
230    }
231
232    let budget = node_timeout(node, ctx);
233    let outcome = run_command_with_timeout(&mut cmd, budget, Some(&capture_path))?;
234    Ok(outcome_from_exit(outcome))
235}
236
237#[cfg(windows)]
238fn bash_command(body: &str) -> Command {
239    use std::os::windows::process::CommandExt;
240    let mut cmd = Command::new("cmd");
241    // Mirror `tests/claude_proc_stub.rs`: `raw_arg` bypasses Rust's
242    // default Windows argument escaper, which would double-escape any
243    // embedded double quotes that `cmd /S /C "..."` then mis-parses.
244    // `/S` tells `cmd.exe` to strip exactly the outer pair of quotes
245    // before interpreting the rest, so the body is forwarded verbatim
246    // (preserving `"` characters inside the body).
247    //
248    // We cannot neutralise cmd's own metacharacter re-parsing (`&`,
249    // `|`, `%VAR%`) without switching to PowerShell or WSL — that is
250    // documented deferred scope. The concrete bug this mitigates is
251    // Rust-level quote mangling, which would silently corrupt bodies
252    // containing double quotes regardless of cmd behaviour.
253    cmd.raw_arg(format!("/S /C \"{body}\""));
254    cmd
255}
256
257#[cfg(not(windows))]
258fn bash_command(body: &str) -> Command {
259    let mut cmd = Command::new("sh");
260    cmd.arg("-c").arg(body);
261    cmd
262}
263
264// ── AI path (command / prompt) ────────────────────────────────────
265
266fn run_ai(node: &Node, ctx: &ExecutorContext<'_>) -> Result<NodeOutcome, DispatchError> {
267    let prompt = ai_prompt_for(node)?;
268    let capture_path = node_capture_path(ctx, &node.id);
269    ensure_parent_dir(&capture_path)?;
270
271    let opts = build_spawn_opts(node, ctx, prompt, None);
272    let budget = node_timeout(node, ctx);
273    let outcome = run_claude_iteration(&opts, &capture_path, CaptureMode::Truncate, budget, &[])?;
274
275    Ok(match outcome {
276        ClaudeOutcome::CleanExit => NodeOutcome::Completed,
277        ClaudeOutcome::SentinelHit { token: _ } => {
278            // Only registered sentinel for non-loop AI nodes is BLOCKED.
279            NodeOutcome::Failed {
280                kind: ErrorKind::Blocked,
281                message: Some("assistant emitted BLOCKED".into()),
282            }
283        }
284        other => claude_failure(other, node, budget),
285    })
286}
287
288/// Translate the non-`CleanExit`, non-`SentinelHit` arms of
289/// [`ClaudeOutcome`] into [`NodeOutcome::Failed`]. Shared by `run_ai`
290/// and `run_loop` so their error-message shapes do not drift. The
291/// sentinel-hit and clean-exit cases differ per caller (the AI path
292/// treats any hit as BLOCKED; the loop path discriminates BLOCKED vs
293/// `until`), so they stay inline at each call site.
294fn claude_failure(outcome: ClaudeOutcome, node: &Node, budget: Duration) -> NodeOutcome {
295    match outcome {
296        ClaudeOutcome::CleanExit | ClaudeOutcome::SentinelHit { .. } => {
297            unreachable!("claude_failure invoked on a non-failure outcome");
298        }
299        ClaudeOutcome::Timeout => NodeOutcome::Failed {
300            kind: ErrorKind::Timeout,
301            message: Some(format!(
302                "node {} exceeded timeout of {}s",
303                node.id,
304                budget.as_secs()
305            )),
306        },
307        ClaudeOutcome::HostMissing => NodeOutcome::Failed {
308            kind: ErrorKind::HostMissing,
309            message: Some("claude binary not found on PATH".into()),
310        },
311        ClaudeOutcome::Crash { stderr_tail } => NodeOutcome::Failed {
312            kind: ErrorKind::Crash,
313            message: Some(stderr_tail),
314        },
315    }
316}
317
318fn ai_prompt_for(node: &Node) -> Result<String, DispatchError> {
319    if let Some(prompt) = &node.prompt {
320        return Ok(prompt.clone());
321    }
322    if let Some(command) = &node.command {
323        // v1 convention: commands map to Claude Code slash commands.
324        return Ok(format!("/{command}"));
325    }
326    if node.loop_.is_some() {
327        return Err(DispatchError::InvalidNode {
328            node_id: node.id.clone(),
329            reason: "loop body should route through run_loop, not run_ai".into(),
330        });
331    }
332    Err(DispatchError::InvalidNode {
333        node_id: node.id.clone(),
334        reason: "AI node carried neither prompt: nor command:".into(),
335    })
336}
337
338// ── Loop path ────────────────────────────────────────────────────────
339
340fn run_loop(node: &Node, ctx: &ExecutorContext<'_>) -> Result<NodeOutcome, DispatchError> {
341    let body = node
342        .loop_
343        .as_ref()
344        .ok_or_else(|| DispatchError::InvalidNode {
345            node_id: node.id.clone(),
346            reason: "loop dispatch on a non-loop node".into(),
347        })?;
348
349    let prompt = loop_body_prompt(node, body)?;
350    let capture_path = node_capture_path(ctx, &node.id);
351    ensure_parent_dir(&capture_path)?;
352
353    // Pre-allocate the session UUID when the loop wants a resumed
354    // session. `claude --session-id` requires a UUID (spike-locked).
355    let session_uuid = if !body.fresh_context {
356        Some(Uuid::new_v4().to_string())
357    } else {
358        None
359    };
360
361    let until_tokens: Vec<String> = vec![body.until.clone()];
362    let budget = node_timeout(node, ctx);
363    let deadline = Instant::now() + budget;
364
365    for iter in 1..=body.max_iterations {
366        // Deadline first: do not mutate the capture file after the
367        // wall-clock budget has expired. This prevents dangling
368        // `--- iteration N ---` headers with no body below on
369        // timeout: 0 / near-exhausted-budget runs.
370        let remaining = deadline.saturating_duration_since(Instant::now());
371        if remaining.is_zero() {
372            return Ok(NodeOutcome::Failed {
373                kind: ErrorKind::Timeout,
374                message: Some(format!(
375                    "loop {} exceeded timeout of {}s",
376                    node.id,
377                    budget.as_secs()
378                )),
379            });
380        }
381
382        // Iter 1 truncates any stale capture from a prior dispatch of
383        // this node; subsequent iterations append below the iter-1
384        // marker. StreamParser then opens the file with Append so its
385        // writes land after our header.
386        if iter == 1 {
387            truncate_capture(&capture_path)?;
388        }
389        let byte_offset = write_iteration_marker(&capture_path, iter)?;
390        ctx.event_log
391            .append(&Event::IterationStarted(IterationStarted {
392                id: new_event_id(),
393                ts: iso_utc_now(),
394                run_id: ctx.run_id.to_string(),
395                node_id: node.id.clone(),
396                iteration: iter,
397                byte_offset,
398            }))
399            .map_err(DispatchError::from)?;
400
401        let session = session_uuid.as_ref().map(|uuid| {
402            if iter == 1 {
403                Session::New(uuid.clone())
404            } else {
405                Session::Resume(uuid.clone())
406            }
407        });
408
409        let opts = build_spawn_opts(node, ctx, prompt.clone(), session);
410
411        let outcome = run_claude_iteration(
412            &opts,
413            &capture_path,
414            CaptureMode::Append,
415            remaining,
416            &until_tokens,
417        )?;
418
419        match outcome {
420            ClaudeOutcome::SentinelHit { token } => {
421                // BLOCKED always takes priority over the user's
422                // `until` token — even if a malformed pipe somehow
423                // slipped `until: BLOCKED` past the validator.
424                if token == sentinel::BLOCKED {
425                    return Ok(NodeOutcome::Failed {
426                        kind: ErrorKind::Blocked,
427                        message: Some(format!(
428                            "loop {} assistant emitted BLOCKED on iteration {iter}",
429                            node.id
430                        )),
431                    });
432                }
433                if token == body.until {
434                    return Ok(NodeOutcome::Completed);
435                }
436                // Scanner surfaces only BLOCKED or the registered
437                // `until` token; no other token can reach here.
438                return Err(DispatchError::InvalidNode {
439                    node_id: node.id.clone(),
440                    reason: format!("scanner returned unexpected token `{token}`"),
441                });
442            }
443            ClaudeOutcome::CleanExit => {
444                // Iteration ended without firing the `until` sentinel.
445                // Continue to the next iteration.
446            }
447            failure => return Ok(claude_failure(failure, node, budget)),
448        }
449    }
450
451    Ok(NodeOutcome::Failed {
452        kind: ErrorKind::MaxIterationsExceeded,
453        message: Some(format!(
454            "loop {} exhausted {} iterations without matching `{}`",
455            node.id, body.max_iterations, body.until
456        )),
457    })
458}
459
460fn loop_body_prompt(node: &Node, body: &LoopBody) -> Result<String, DispatchError> {
461    if let Some(prompt) = &body.prompt {
462        return Ok(prompt.clone());
463    }
464    if let Some(command) = &body.command {
465        return Ok(format!("/{command}"));
466    }
467    Err(DispatchError::InvalidNode {
468        node_id: node.id.clone(),
469        reason: "loop body carries neither prompt: nor command:".into(),
470    })
471}
472
473/// Namespaced, machine-parseable iteration marker line. The `omne:`
474/// prefix plus the trailing sentinel `===` make a collision with normal
475/// assistant prose vanishingly unlikely (unlike the previous
476/// `--- iteration N ---` form, which clashes with Markdown setext
477/// headings and `git diff` hunk headers). Agents that still prefer
478/// structured boundaries should read `iteration.started` from the event
479/// log, which also carries the byte offset where each iteration's
480/// content begins.
481const ITERATION_MARKER_PREFIX: &str = "=== omne:iteration:";
482
483/// Append an iteration marker line to `capture_path` and return the
484/// byte offset at which the iteration's assistant output will start —
485/// i.e. the file size immediately after the marker line is written.
486/// Agents slice `[byte_offset_N .. byte_offset_{N+1})` to reconstruct a
487/// single iteration's text without having to re-parse markers.
488fn write_iteration_marker(capture_path: &Path, iter: u32) -> Result<u64, DispatchError> {
489    let mut f = std::fs::OpenOptions::new()
490        .create(true)
491        .append(true)
492        .open(capture_path)
493        .map_err(|source| DispatchError::Io {
494            path: capture_path.to_path_buf(),
495            source,
496        })?;
497    writeln!(f, "\n{ITERATION_MARKER_PREFIX}{iter} ===").map_err(|source| DispatchError::Io {
498        path: capture_path.to_path_buf(),
499        source,
500    })?;
501    f.sync_data().map_err(|source| DispatchError::Io {
502        path: capture_path.to_path_buf(),
503        source,
504    })?;
505    let meta = std::fs::metadata(capture_path).map_err(|source| DispatchError::Io {
506        path: capture_path.to_path_buf(),
507        source,
508    })?;
509    Ok(meta.len())
510}
511
512/// Truncate `capture_path` to zero bytes, creating it if absent. Used
513/// by loop iteration 1 so a re-dispatch of a failed loop against the
514/// same `run_id` starts from a clean capture file rather than stacking
515/// on top of prior-run output.
516fn truncate_capture(capture_path: &Path) -> Result<(), DispatchError> {
517    std::fs::OpenOptions::new()
518        .create(true)
519        .write(true)
520        .truncate(true)
521        .open(capture_path)
522        .map(|_| ())
523        .map_err(|source| DispatchError::Io {
524            path: capture_path.to_path_buf(),
525            source,
526        })
527}
528
529// ── Claude iteration runner (shared by AI + loop) ────────────────
530
531/// Outcome of one `claude -p` subprocess invocation as interpreted by
532/// the executor — the scanner hit, a timeout, a clean exit, or a crash.
533enum ClaudeOutcome {
534    CleanExit,
535    SentinelHit { token: String },
536    Timeout,
537    HostMissing,
538    Crash { stderr_tail: String },
539}
540
541/// Run one `claude -p` invocation and interpret the stream line-by-line.
542///
543/// A background watchdog thread kills the child when `budget` elapses;
544/// the main thread drains the parser, feeds each line to a
545/// [`Scanner`], and returns as soon as a sentinel hits (or the stream
546/// closes). `capture_mode` lets the loop controller preserve prior
547/// iterations' text with [`CaptureMode::Append`].
548fn run_claude_iteration(
549    opts: &SpawnOpts,
550    capture_path: &Path,
551    capture_mode: CaptureMode,
552    budget: Duration,
553    until_tokens: &[String],
554) -> Result<ClaudeOutcome, DispatchError> {
555    let child = match claude_proc::spawn(opts) {
556        Ok(c) => c,
557        Err(claude_proc::Error::HostMissing) => return Ok(ClaudeOutcome::HostMissing),
558        Err(other) => return Err(DispatchError::from(other)),
559    };
560
561    let proc = ClaudeProcess::from_child_with_mode(child, capture_path, capture_mode)?;
562    let killer = proc.killer();
563    let (cancel_tx, cancel_rx) = mpsc::channel::<()>();
564    let watchdog = spawn_watchdog(killer.clone(), cancel_rx, budget);
565
566    let scanner = Scanner::new(until_tokens);
567    let mut proc = proc;
568    let mut hit: Option<String> = None;
569    let mut stream_error: Option<claude_proc::Error> = None;
570    for line in proc.by_ref() {
571        match line {
572            Ok(al) => {
573                if let Some(h) = scanner.feed(&al.text) {
574                    hit = Some(h.token);
575                    // Sentinel short-circuit: kill the child so the
576                    // stream does not keep reading. Both `BLOCKED`
577                    // (always reserved) and user `until` tokens end
578                    // the iteration here.
579                    let _ = killer.kill();
580                    break;
581                }
582            }
583            Err(e) => {
584                stream_error = Some(e);
585                break;
586            }
587        }
588    }
589
590    // Signal the watchdog before waiting for the child so it doesn't
591    // kill a naturally-finished process after the fact. Dropping the
592    // sender signals "cancelled" via RecvTimeoutError::Disconnected.
593    drop(cancel_tx);
594    let timed_out = watchdog.join().unwrap_or(false);
595
596    let (status, stderr) = match proc.finish() {
597        Ok(s) => s,
598        Err(e) => {
599            return Err(DispatchError::from(e));
600        }
601    };
602
603    if let Some(token) = hit {
604        return Ok(ClaudeOutcome::SentinelHit { token });
605    }
606    if timed_out {
607        return Ok(ClaudeOutcome::Timeout);
608    }
609    if let Some(e) = stream_error {
610        // Stream-level I/O errors (malformed stdin we couldn't parse)
611        // are rare — surface as a crash so the runner can record the
612        // stderr tail alongside.
613        return Ok(ClaudeOutcome::Crash {
614            stderr_tail: format!("stream error: {e}\nstderr: {}", tail(&stderr, 1024)),
615        });
616    }
617    if !status.success() {
618        return Ok(ClaudeOutcome::Crash {
619            stderr_tail: tail(&stderr, 1024),
620        });
621    }
622    Ok(ClaudeOutcome::CleanExit)
623}
624
625/// Spawn a thread that kills `killer`'s child after `budget`, unless
626/// the main thread signals cancellation first.
627///
628/// Cancellation is signalled by dropping the `cancel_tx` side of the
629/// channel (which surfaces as `RecvTimeoutError::Disconnected` on the
630/// rx side). The channel is never `.send()`-used today — a value
631/// arriving via `Ok(_)` would also mean "cancelled; don't kill", but
632/// no caller exercises that path so we treat it as unreachable to keep
633/// the intent of the protocol explicit. Returns `true` once joined iff
634/// the watchdog actually killed the child.
635fn spawn_watchdog(
636    killer: ChildKiller,
637    rx: mpsc::Receiver<()>,
638    budget: Duration,
639) -> thread::JoinHandle<bool> {
640    thread::spawn(move || match rx.recv_timeout(budget) {
641        Err(mpsc::RecvTimeoutError::Disconnected) => false,
642        Err(mpsc::RecvTimeoutError::Timeout) => {
643            let _ = killer.kill();
644            true
645        }
646        Ok(()) => {
647            // No caller currently sends on this channel; cancellation
648            // is by drop. Reaching this arm means a future refactor
649            // added an explicit cancel path without updating this
650            // comment — treat as cancellation.
651            false
652        }
653    })
654}
655
656fn build_spawn_opts(
657    node: &Node,
658    ctx: &ExecutorContext<'_>,
659    prompt: String,
660    session: Option<Session>,
661) -> SpawnOpts {
662    SpawnOpts {
663        prompt,
664        cwd: ctx.worktree.to_path_buf(),
665        model: effective_model(node, ctx).map(str::to_string),
666        allowed_tools: node.allowed_tools.clone(),
667        session,
668        extra_args: Vec::new(),
669        env_vars: build_ai_env_vars(node, ctx),
670        bin: ctx.claude_bin.map(|p| p.to_path_buf()),
671    }
672}
673
674/// Environment variables forwarded to `claude -p` subprocesses so AI
675/// nodes can read the run's identity and `--input` parameters. Mirrors
676/// the bash-node injection at [`run_bash`] (keys uppercased via
677/// [`str::to_uppercase`], values passed through verbatim) and the
678/// gate-hook injection in [`run_gate`], so all three execution paths
679/// (bash, AI, gate hook) expose the same contract to distro authors.
680fn build_ai_env_vars(node: &Node, ctx: &ExecutorContext<'_>) -> Vec<(String, String)> {
681    let mut env = Vec::with_capacity(3 + ctx.inputs.len());
682    env.push(("OMNE_RUN_ID".to_string(), ctx.run_id.to_string()));
683    env.push(("OMNE_NODE_ID".to_string(), node.id.clone()));
684    env.push((
685        "OMNE_VOLUME_ROOT".to_string(),
686        ctx.volume_root.to_string_lossy().into_owned(),
687    ));
688    for input in ctx.inputs {
689        env.push((
690            format!("OMNE_INPUT_{}", input.key.to_uppercase()),
691            input.value.clone(),
692        ));
693    }
694    env
695}
696
697fn effective_model<'a>(node: &'a Node, ctx: &'a ExecutorContext<'a>) -> Option<&'a str> {
698    node.model
699        .as_deref()
700        .or_else(|| ctx.default_model.map(|s| s as &str))
701}
702
703// ── Bash / gate shared subprocess runner ────────────────────────
704
705/// Outcome of a subprocess we do not stream-parse — bash and gate hooks
706/// both use this path.
707struct RawExit {
708    status: std::process::ExitStatus,
709    stdout: Vec<u8>,
710    stderr: Vec<u8>,
711    timed_out: bool,
712}
713
714/// Ask `cmd` to spawn its child into a fresh process group (Unix) or
715/// process-console group (Windows). Enables [`kill_process_tree`] to
716/// target the whole subtree when a timeout fires.
717#[cfg(unix)]
718fn set_new_process_group(cmd: &mut Command) {
719    use std::os::unix::process::CommandExt;
720    // `0` means "make the child the leader of a new process group
721    // whose id equals its pid". Then `kill -KILL -$pid` (negative pid)
722    // signals every member of that group.
723    cmd.process_group(0);
724}
725
726#[cfg(windows)]
727fn set_new_process_group(cmd: &mut Command) {
728    use std::os::windows::process::CommandExt;
729    // CREATE_NEW_PROCESS_GROUP = 0x00000200 — Win32 process creation
730    // flag that dissociates the child from our console group so
731    // `taskkill /T` walks the right tree.
732    const CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
733    cmd.creation_flags(CREATE_NEW_PROCESS_GROUP);
734}
735
736/// Send a terminal signal to every process in `pid`'s group (Unix) or
737/// every process in the tree rooted at `pid` (Windows). Used when the
738/// wall-clock budget expires so backgrounded grandchildren cannot keep
739/// the drained pipe handles open past the timeout.
740///
741/// Deliberately delegated to the platform's canonical tree-kill
742/// utility (`kill`, `taskkill`) rather than a raw syscall: the utility
743/// already handles the edge cases (zombies, permission errors, missing
744/// targets) that a minimal `libc::killpg` or `OpenProcess` wrapper
745/// would re-invent.
746#[cfg(unix)]
747fn kill_process_tree(pid: u32) {
748    let _ = Command::new("kill")
749        .args(["-KILL", "--", &format!("-{pid}")])
750        .stdin(Stdio::null())
751        .stdout(Stdio::null())
752        .stderr(Stdio::null())
753        .status();
754}
755
756#[cfg(windows)]
757fn kill_process_tree(pid: u32) {
758    let _ = Command::new("taskkill")
759        .args(["/T", "/F", "/PID", &pid.to_string()])
760        .stdin(Stdio::null())
761        .stdout(Stdio::null())
762        .stderr(Stdio::null())
763        .status();
764}
765
766fn run_command_with_timeout(
767    cmd: &mut Command,
768    budget: Duration,
769    capture_stdout_at: Option<&Path>,
770) -> Result<RawExit, DispatchError> {
771    // Place the child in its own process group / console group so a
772    // timeout kill reaches backgrounded grandchildren too. Without
773    // this, a bash body that spawns a background subprocess (`& sleep
774    // 60` on Unix, or any native `start` / pipeline spawn on Windows)
775    // inherits our stdout/stderr pipes; killing only the direct child
776    // leaves grandchildren holding the pipe handles open and
777    // `stdout_thread.join()` blocks until those grandchildren die
778    // naturally — potentially forever.
779    set_new_process_group(cmd);
780    let mut child = cmd
781        .spawn()
782        .map_err(|source| DispatchError::Spawn { source })?;
783    let child_pid = child.id();
784
785    // Drain stdout + stderr on dedicated threads so buffer-full
786    // deadlocks never stall a long-running child, same pattern as
787    // `ClaudeProcess`.
788    let stdout_handle = child.stdout.take();
789    let stderr_handle = child.stderr.take();
790    let stdout_thread = stdout_handle.map(|mut s| {
791        thread::spawn(move || -> std::io::Result<Vec<u8>> {
792            let mut buf = Vec::new();
793            s.read_to_end(&mut buf)?;
794            Ok(buf)
795        })
796    });
797    let stderr_thread = stderr_handle.map(|mut s| {
798        thread::spawn(move || -> std::io::Result<Vec<u8>> {
799            let mut buf = Vec::new();
800            s.read_to_end(&mut buf)?;
801            Ok(buf)
802        })
803    });
804
805    use wait_timeout::ChildExt;
806    let (status, timed_out) = match child
807        .wait_timeout(budget)
808        .map_err(|source| DispatchError::Wait { source })?
809    {
810        Some(s) => (s, false),
811        None => {
812            // Tree-kill first so grandchildren release the pipe
813            // handles that would otherwise block `stdout_thread.join`
814            // forever. `child.kill()` is a belt-and-suspenders safety
815            // net for the direct child on platforms where the
816            // tree-kill utility is missing.
817            kill_process_tree(child_pid);
818            let _ = child.kill();
819            let s = child
820                .wait()
821                .map_err(|source| DispatchError::Wait { source })?;
822            (s, true)
823        }
824    };
825
826    let stdout = stdout_thread
827        .map(|h| {
828            h.join()
829                .unwrap_or_else(|_| Ok(Vec::new()))
830                .unwrap_or_default()
831        })
832        .unwrap_or_default();
833    let stderr = stderr_thread
834        .map(|h| {
835            h.join()
836                .unwrap_or_else(|_| Ok(Vec::new()))
837                .unwrap_or_default()
838        })
839        .unwrap_or_default();
840
841    if let Some(cap) = capture_stdout_at {
842        std::fs::write(cap, &stdout).map_err(|source| DispatchError::Io {
843            path: cap.to_path_buf(),
844            source,
845        })?;
846    }
847    Ok(RawExit {
848        status,
849        stdout,
850        stderr,
851        timed_out,
852    })
853}
854
855fn outcome_from_exit(raw: RawExit) -> NodeOutcome {
856    if raw.timed_out {
857        return NodeOutcome::Failed {
858            kind: ErrorKind::Timeout,
859            message: Some(format!(
860                "subprocess killed after exceeding wall-clock budget; stderr: {}",
861                String::from_utf8_lossy(&tail_bytes(&raw.stderr, 1024)).into_owned()
862            )),
863        };
864    }
865    if raw.status.success() {
866        return NodeOutcome::Completed;
867    }
868    NodeOutcome::Failed {
869        kind: ErrorKind::Crash,
870        message: Some(
871            String::from_utf8_lossy(&tail_bytes(&raw.stderr, 1024))
872                .trim()
873                .to_string(),
874        ),
875    }
876}
877
878// ── Gate path ────────────────────────────────────────────────────
879
880fn run_gate(
881    node: &Node,
882    gate: &str,
883    ctx: &ExecutorContext<'_>,
884) -> Result<NodeOutcome, DispatchError> {
885    let script = gate_script_path(ctx.volume_root, gate);
886    // Validator checked existence at load time, but the file could
887    // have been deleted in the interval. Recheck so a missing hook
888    // surfaces as `NodeFailed{GateFailed}` rather than a spawn-time
889    // `DispatchError` the caller has to special-case.
890    if !script.is_file() {
891        return Ok(NodeOutcome::Failed {
892            kind: ErrorKind::GateFailed,
893            message: Some(format!(
894                "gate hook {} missing at dispatch time",
895                script.display()
896            )),
897        });
898    }
899    // Defence-in-depth path traversal check. The validator already
900    // rejects gate names with `/`, `\`, or `..`, but a DAG-earlier
901    // bash node could plant a file at a traversal target after
902    // validation ran — we rebuild the canonical path here and assert
903    // it's still under `dist/hooks/`.
904    if let Some(outcome) = enforce_gate_boundary(&script, ctx.volume_root, gate)? {
905        return Ok(outcome);
906    }
907    // Gate runs from the volume root, not the per-run worktree. Hooks
908    // are distro-level scripts that may need to see the whole volume
909    // layout (e.g. `.omne/lib/cfg/`), so placing them in the worktree
910    // would hide paths that only exist at the root.
911    let mut cmd = gate_command(&script);
912    cmd.current_dir(ctx.volume_root)
913        .stdin(Stdio::null())
914        .stdout(Stdio::piped())
915        .stderr(Stdio::piped())
916        .env("OMNE_RUN_ID", ctx.run_id)
917        .env("OMNE_NODE_ID", &node.id)
918        .env("OMNE_GATE_NAME", gate)
919        .env("OMNE_VOLUME_ROOT", ctx.volume_root);
920
921    let raw = run_command_with_timeout(&mut cmd, ctx.gate_timeout, None)?;
922    if raw.timed_out {
923        return Ok(NodeOutcome::Failed {
924            kind: ErrorKind::GateTimeout,
925            message: Some(format!(
926                "gate {gate} exceeded {}s budget",
927                ctx.gate_timeout.as_secs()
928            )),
929        });
930    }
931    if !raw.status.success() {
932        return Ok(NodeOutcome::Failed {
933            kind: ErrorKind::GateFailed,
934            message: Some(
935                String::from_utf8_lossy(&tail_bytes(&raw.stderr, 1024))
936                    .trim()
937                    .to_string(),
938            ),
939        });
940    }
941
942    let stdout_tail = if raw.stdout.is_empty() {
943        None
944    } else {
945        let trimmed = String::from_utf8_lossy(&tail_bytes(&raw.stdout, 1024))
946            .trim()
947            .to_string();
948        (!trimmed.is_empty()).then_some(trimmed)
949    };
950
951    ctx.event_log
952        .append(&Event::GatePassed(GatePassed {
953            id: new_event_id(),
954            ts: iso_utc_now(),
955            run_id: ctx.run_id.to_string(),
956            node_id: node.id.clone(),
957            gate: gate.to_string(),
958            method: GateMethod::Hook,
959            stdout: stdout_tail,
960        }))
961        .map_err(DispatchError::from)?;
962    Ok(NodeOutcome::Completed)
963}
964
965/// Assert that `script` (known to exist at this point) resolves inside
966/// `<volume_root>/.omne/dist/hooks/`. Returns `Some(NodeFailed)` when
967/// the resolved path escapes the hooks directory, so the caller can
968/// record a clean `gate.failed` terminal event instead of propagating
969/// a `DispatchError`. Returns `None` when the path is in bounds.
970///
971/// Both paths are `canonicalize`d so symlinks that leave the hooks dir
972/// are also rejected. The check is best-effort: if the hooks directory
973/// itself is missing (distro not installed) we return `None` and let
974/// the subsequent spawn fail with a standard error.
975fn enforce_gate_boundary(
976    script: &Path,
977    volume_root: &Path,
978    gate: &str,
979) -> Result<Option<NodeOutcome>, DispatchError> {
980    let hooks_dir = volume::dist_dir(volume_root).join("hooks");
981    let canonical_hooks = match hooks_dir.canonicalize() {
982        Ok(p) => p,
983        Err(_) => return Ok(None),
984    };
985    let canonical_script = script.canonicalize().map_err(|source| DispatchError::Io {
986        path: script.to_path_buf(),
987        source,
988    })?;
989    if !canonical_script.starts_with(&canonical_hooks) {
990        return Ok(Some(NodeOutcome::Failed {
991            kind: ErrorKind::GateFailed,
992            message: Some(format!(
993                "gate `{gate}` resolves outside {}: {}",
994                canonical_hooks.display(),
995                canonical_script.display()
996            )),
997        }));
998    }
999    Ok(None)
1000}
1001
1002fn gate_script_path(volume_root: &Path, gate: &str) -> PathBuf {
1003    volume::dist_dir(volume_root)
1004        .join("hooks")
1005        .join(format!("{gate}.{}", platform_hook_extension()))
1006}
1007
1008#[cfg(windows)]
1009fn gate_command(script: &Path) -> Command {
1010    let mut cmd = Command::new("powershell");
1011    // Intentionally no `-ExecutionPolicy Bypass`: overriding the user's
1012    // AllSigned / RemoteSigned policy silently removed the OS signing
1013    // check for every gate hook. Distro authors relying on unsigned
1014    // hooks should document the `Unblock-File` step in their install
1015    // guide rather than have the runner bypass the policy.
1016    cmd.arg("-NoProfile").arg("-File").arg(script);
1017    cmd
1018}
1019
1020#[cfg(not(windows))]
1021fn gate_command(script: &Path) -> Command {
1022    let mut cmd = Command::new("sh");
1023    cmd.arg(script);
1024    cmd
1025}
1026
1027#[cfg(windows)]
1028fn platform_hook_extension() -> &'static str {
1029    "ps1"
1030}
1031
1032#[cfg(not(windows))]
1033fn platform_hook_extension() -> &'static str {
1034    "sh"
1035}
1036
1037// ── Terminal event emission ─────────────────────────────────────
1038
1039fn emit_terminal(
1040    node: &Node,
1041    outcome: &NodeOutcome,
1042    ctx: &ExecutorContext<'_>,
1043) -> Result<(), DispatchError> {
1044    let event = match outcome {
1045        NodeOutcome::Completed => Event::NodeCompleted(NodeCompleted {
1046            id: new_event_id(),
1047            ts: iso_utc_now(),
1048            run_id: ctx.run_id.to_string(),
1049            node_id: node.id.clone(),
1050            output_path: capture_output_path_wire(ctx.run_id, &node.id),
1051        }),
1052        NodeOutcome::Failed { kind, message } => Event::NodeFailed(NodeFailed {
1053            id: new_event_id(),
1054            ts: iso_utc_now(),
1055            run_id: ctx.run_id.to_string(),
1056            node_id: node.id.clone(),
1057            error: NodeError { kind: *kind },
1058            message: message.clone(),
1059        }),
1060    };
1061    ctx.event_log.append(&event).map_err(DispatchError::from)?;
1062    Ok(())
1063}
1064
1065// ── Helpers ──────────────────────────────────────────────────────
1066
1067fn node_capture_path(ctx: &ExecutorContext<'_>, node_id: &str) -> PathBuf {
1068    volume::nodes_dir(ctx.volume_root, ctx.run_id).join(format!("{node_id}.out"))
1069}
1070
1071/// Forward-slash, volume-root-relative path for the wire `output_path`
1072/// field on `node.completed`. Single source of truth lives in
1073/// [`volume::node_capture_wire_path`] so a layout change in `volume.rs`
1074/// propagates here automatically.
1075fn capture_output_path_wire(run_id: &str, node_id: &str) -> String {
1076    volume::node_capture_wire_path(run_id, node_id)
1077}
1078
1079fn ensure_parent_dir(path: &Path) -> Result<(), DispatchError> {
1080    if let Some(parent) = path.parent() {
1081        if !parent.as_os_str().is_empty() {
1082            std::fs::create_dir_all(parent).map_err(|source| DispatchError::Io {
1083                path: parent.to_path_buf(),
1084                source,
1085            })?;
1086        }
1087    }
1088    Ok(())
1089}
1090
1091fn node_timeout(node: &Node, ctx: &ExecutorContext<'_>) -> Duration {
1092    node.timeout
1093        .map(Duration::from_secs)
1094        .unwrap_or(ctx.default_node_timeout)
1095}
1096
1097fn new_event_id() -> String {
1098    Ulid::new().to_string().to_lowercase()
1099}
1100
1101/// Current UTC instant as `YYYY-MM-DDTHH:MM:SSZ`. Delegates to
1102/// [`crate::clock`] so `init::chrono_today` and `events.jsonl`
1103/// timestamps share a single civil-calendar implementation.
1104fn iso_utc_now() -> String {
1105    crate::clock::now_utc().format_iso_utc()
1106}
1107
1108/// Byte-tail of `b` at most `max_bytes` long, aligned forward to the
1109/// next valid UTF-8 start boundary. Aligning inside the helper means
1110/// a caller that later decodes the slice as UTF-8 (via
1111/// `from_utf8_lossy`) never ends up with a leading U+FFFD replacement
1112/// character from a split multi-byte codepoint.
1113fn tail_bytes(b: &[u8], max_bytes: usize) -> Vec<u8> {
1114    if b.len() <= max_bytes {
1115        return b.to_vec();
1116    }
1117    let mut start = b.len() - max_bytes;
1118    // UTF-8 continuation bytes are `10xxxxxx` (0x80..=0xBF). Skip
1119    // forward until we reach a lead byte (or ASCII) so decoding
1120    // doesn't split a codepoint.
1121    while start < b.len() && (b[start] & 0b1100_0000) == 0b1000_0000 {
1122        start += 1;
1123    }
1124    b[start..].to_vec()
1125}
1126
1127/// UTF-8-safe trimmed tail of `s`. Built on top of [`tail_bytes`] so
1128/// the boundary-alignment logic lives in one place.
1129fn tail(s: &str, max_bytes: usize) -> String {
1130    let bytes = tail_bytes(s.as_bytes(), max_bytes);
1131    String::from_utf8_lossy(&bytes).trim().to_string()
1132}
1133
1134// ── Errors ──────────────────────────────────────────────────────
1135
1136/// Failure categories returned from [`dispatch`] itself (distinct from
1137/// `NodeOutcome::Failed`, which is a *recorded* node failure). These
1138/// are infrastructure errors that prevent the executor from even
1139/// deciding the node's outcome — event-log I/O, malformed pipe data
1140/// leaking past validation, stream-parser crashes, etc.
1141#[derive(Debug, thiserror::Error)]
1142pub enum DispatchError {
1143    #[error("node `{node_id}` rejected by executor: {reason}")]
1144    InvalidNode { node_id: String, reason: String },
1145
1146    #[error("event log error: {0}")]
1147    EventLog(#[from] crate::event_log::Error),
1148
1149    #[error("claude subprocess error: {0}")]
1150    ClaudeProc(#[from] claude_proc::Error),
1151
1152    /// Path-bearing I/O error (capture-file writes, iteration-marker
1153    /// writes, `mkdir_p`). Spawn and wait failures use [`Spawn`] /
1154    /// [`Wait`] instead so the error message does not render as
1155    /// `I/O error on : ...` with an empty path.
1156    #[error("I/O error on {}: {source}", path.display())]
1157    Io {
1158        path: PathBuf,
1159        #[source]
1160        source: std::io::Error,
1161    },
1162
1163    /// `Command::spawn` failed before the child ever started. The
1164    /// `Command` itself is lost (spawn consumed it by reference but we
1165    /// have no useful identifier beyond the originating subprocess
1166    /// family, which the caller can infer from context).
1167    #[error("failed to spawn subprocess: {source}")]
1168    Spawn {
1169        #[source]
1170        source: std::io::Error,
1171    },
1172
1173    /// `wait_timeout` / `wait` on a live child reported an I/O error
1174    /// (EINTR left unhandled, process reaping race). Distinct from
1175    /// [`Spawn`] so telemetry can distinguish start-time failures from
1176    /// mid-flight failures.
1177    #[error("failed to wait on subprocess: {source}")]
1178    Wait {
1179        #[source]
1180        source: std::io::Error,
1181    },
1182}
1183
1184#[cfg(test)]
1185mod tests {
1186    use super::*;
1187    use crate::event_log::EventLog;
1188    use crate::events::Input;
1189    use crate::pipe::{Node, TriggerRule};
1190    use std::path::PathBuf;
1191    use std::time::Duration;
1192    use tempfile::TempDir;
1193
1194    #[test]
1195    fn iso_utc_now_is_plausible_shape() {
1196        let s = iso_utc_now();
1197        assert_eq!(s.len(), 20);
1198        assert!(s.ends_with('Z'));
1199        assert!(s.contains('T'));
1200    }
1201
1202    #[test]
1203    fn tail_trims_and_bounds() {
1204        assert_eq!(tail("   hi  ", 1024), "hi");
1205        let long: String = "x".repeat(5000);
1206        assert_eq!(tail(&long, 10).len(), 10);
1207    }
1208
1209    #[test]
1210    fn output_path_wire_uses_forward_slashes() {
1211        let wire = capture_output_path_wire("feature-01abc", "research");
1212        assert!(!wire.contains('\\'));
1213        assert_eq!(wire, ".omne/var/runs/feature-01abc/nodes/research.out");
1214    }
1215
1216    fn bare_ai_node(id: &str) -> Node {
1217        Node {
1218            id: id.into(),
1219            depends_on: vec![],
1220            model: None,
1221            allowed_tools: vec![],
1222            gate: None,
1223            timeout: None,
1224            trigger_rule: TriggerRule::AllSuccess,
1225            command: Some("plan".into()),
1226            prompt: None,
1227            bash: None,
1228            loop_: None,
1229        }
1230    }
1231
1232    #[test]
1233    fn build_ai_env_vars_contains_base_and_input_vars() {
1234        let tmp = TempDir::new().unwrap();
1235        let log = EventLog::for_run(tmp.path(), "feature-01abc").unwrap();
1236        let volume_root: PathBuf = tmp.path().to_path_buf();
1237        let worktree: PathBuf = tmp.path().join(".omne/wt/feature-01abc");
1238        let inputs = vec![
1239            Input {
1240                key: "feature_name".into(),
1241                value: "add-hello".into(),
1242            },
1243            Input {
1244                key: "scope".into(),
1245                value: "src/api".into(),
1246            },
1247        ];
1248        let ctx = ExecutorContext {
1249            volume_root: &volume_root,
1250            run_id: "feature-01abc",
1251            worktree: &worktree,
1252            event_log: &log,
1253            inputs: &inputs,
1254            default_model: None,
1255            claude_bin: None,
1256            default_node_timeout: Duration::from_secs(60),
1257            gate_timeout: Duration::from_secs(60),
1258        };
1259        let node = bare_ai_node("plan");
1260
1261        let env = build_ai_env_vars(&node, &ctx);
1262
1263        let lookup = |k: &str| env.iter().find(|(key, _)| key == k).map(|(_, v)| v.clone());
1264        assert_eq!(lookup("OMNE_RUN_ID"), Some("feature-01abc".to_string()));
1265        assert_eq!(lookup("OMNE_NODE_ID"), Some("plan".to_string()));
1266        assert_eq!(
1267            lookup("OMNE_VOLUME_ROOT"),
1268            Some(volume_root.to_string_lossy().into_owned())
1269        );
1270        assert_eq!(
1271            lookup("OMNE_INPUT_FEATURE_NAME"),
1272            Some("add-hello".to_string())
1273        );
1274        assert_eq!(lookup("OMNE_INPUT_SCOPE"), Some("src/api".to_string()));
1275    }
1276
1277    #[test]
1278    fn build_ai_env_vars_no_inputs_still_has_base_vars() {
1279        let tmp = TempDir::new().unwrap();
1280        let log = EventLog::for_run(tmp.path(), "feature-01xyz").unwrap();
1281        let volume_root: PathBuf = tmp.path().to_path_buf();
1282        let worktree: PathBuf = tmp.path().join(".omne/wt/feature-01xyz");
1283        let ctx = ExecutorContext {
1284            volume_root: &volume_root,
1285            run_id: "feature-01xyz",
1286            worktree: &worktree,
1287            event_log: &log,
1288            inputs: &[],
1289            default_model: None,
1290            claude_bin: None,
1291            default_node_timeout: Duration::from_secs(60),
1292            gate_timeout: Duration::from_secs(60),
1293        };
1294        let node = bare_ai_node("research");
1295
1296        let env = build_ai_env_vars(&node, &ctx);
1297
1298        assert_eq!(env.len(), 3);
1299        assert!(env.iter().all(|(k, _)| !k.starts_with("OMNE_INPUT_")));
1300    }
1301}