Skip to main content

omne_cli/
executor.rs

1//! Node executor (plan Unit 11).
2//!
3//! `dispatch(node, ctx)` is the single entry point the DAG runner in
4//! Unit 12 calls for each `ready()` node. Responsibilities:
5//!
6//! 1. Emit `node.started` before any subprocess spawn.
7//! 2. Dispatch to the node's kind — bash, AI (command / prompt), or
8//!    loop — and run the subprocess under a wall-clock deadline.
9//! 3. For loops, iterate under a sentinel [`Scanner`] that watches for
10//!    `BLOCKED` (always reserved) plus the user's `until` token;
11//!    session lifecycle follows the Unit 0 spike (`--session-id` first
12//!    iteration, `--resume` thereafter, UUID not ULID).
13//! 4. If the node carries a `gate`, run the platform-current hook
14//!    script under a 60s deadline after a successful node terminates.
15//! 5. Emit exactly one terminal event (`node.completed` or
16//!    `node.failed`) and, when a gate ran, a `gate.passed` event
17//!    between the node's body completing and the terminal event.
18//!
19//! The executor is the sole owner of node-level deadline handling —
20//! `claude_proc::ClaudeProcess` deliberately has no per-call timeout so
21//! that this module can decide, for example, that a loop's wall-clock
22//! budget covers all iterations rather than each one independently.
23
24#![allow(dead_code)]
25
26use std::io::{Read, Write};
27use std::path::{Path, PathBuf};
28use std::process::{Command, Stdio};
29use std::sync::mpsc;
30use std::thread;
31use std::time::{Duration, Instant};
32
33use ulid::Ulid;
34use uuid::Uuid;
35
36use crate::claude_proc::{self, CaptureMode, ChildKiller, ClaudeProcess, Session, SpawnOpts};
37use crate::event_log::EventLog;
38use crate::events::{
39    ErrorKind, Event, GateMethod, GatePassed, Input, IterationStarted, NodeCompleted, NodeError,
40    NodeFailed, NodeKind, NodeStarted,
41};
42use crate::pipe::{ExecutionKind, LoopBody, Node};
43use crate::sentinel::{self, Scanner};
44use crate::volume;
45
46/// Default node wall-clock budget when a pipe node declares no
47/// `timeout:`. Picked to match the memory-locked 1800s default called
48/// out in the plan's "Open questions deferred to implementation" block.
49pub const DEFAULT_NODE_TIMEOUT: Duration = Duration::from_secs(1800);
50
51/// Gate hook deadline, per plan R12.
52pub const GATE_TIMEOUT: Duration = Duration::from_secs(60);
53
54/// Inputs the executor needs that span every dispatch call on one run.
55///
56/// Built once by Unit 12's `omne run` handler and threaded through every
57/// node. Borrowed data everywhere; the executor never needs ownership.
58pub struct ExecutorContext<'a> {
59    /// `.omne/` volume root. Resolves worktree + events paths.
60    pub volume_root: &'a Path,
61    /// `run_id` (e.g. `feature-01arz3ndektsv4rrffq69g5fa0`).
62    pub run_id: &'a str,
63    /// Per-run worktree cwd (`.omne/wt/<run_id>`). All subprocesses
64    /// spawn with this as their cwd.
65    pub worktree: &'a Path,
66    /// Append-only log; executor emits `node.*` + `gate.passed` through
67    /// it.
68    pub event_log: &'a EventLog,
69    /// `--input k=v` pairs. Bash nodes export these as
70    /// `OMNE_INPUT_<KEY>`; AI nodes receive them via the gate hook env
71    /// and the pipe-level prompt template (not interpolated at the
72    /// runner in v1 — the pipe body references them).
73    pub inputs: &'a [Input],
74    /// Pipe-level `default_model:`. AI nodes inherit it unless they
75    /// specify their own `model:`.
76    pub default_model: Option<&'a str>,
77    /// Optional `claude` binary override, mainly a test seam.
78    pub claude_bin: Option<&'a Path>,
79    /// Wall-clock budget when a node does not override `timeout:`.
80    /// Defaults to [`DEFAULT_NODE_TIMEOUT`].
81    pub default_node_timeout: Duration,
82    /// Wall-clock budget for gate hook execution. Defaults to
83    /// [`GATE_TIMEOUT`] (60s per plan R12). Exposed on the context so
84    /// integration tests can drive a short gate-timeout path without
85    /// waiting a full minute.
86    pub gate_timeout: Duration,
87}
88
89impl<'a> ExecutorContext<'a> {
90    /// Convenience constructor that fills in the plan-default wall-clock
91    /// budgets. `worktree` is typically `.omne/wt/<run_id>` — the
92    /// runner computes it once and threads it through.
93    pub fn new(
94        volume_root: &'a Path,
95        run_id: &'a str,
96        worktree: &'a Path,
97        event_log: &'a EventLog,
98        inputs: &'a [Input],
99    ) -> Self {
100        Self {
101            volume_root,
102            run_id,
103            worktree,
104            event_log,
105            inputs,
106            default_model: None,
107            claude_bin: None,
108            default_node_timeout: DEFAULT_NODE_TIMEOUT,
109            gate_timeout: GATE_TIMEOUT,
110        }
111    }
112}
113
114/// Result of dispatching one node.
115///
116/// `Completed` and `Failed` map 1:1 to `node.completed` and
117/// `node.failed` in the event log; the runner threads the variant
118/// through `dag::Scheduler::mark`.
119#[derive(Debug, Clone, Eq, PartialEq)]
120pub enum NodeOutcome {
121    Completed,
122    Failed {
123        kind: ErrorKind,
124        message: Option<String>,
125    },
126}
127
128/// Dispatch one node end-to-end.
129///
130/// Emits `node.started` on entry, runs the node body, runs any attached
131/// gate, and emits exactly one terminal event (`node.completed` or
132/// `node.failed`). The returned [`NodeOutcome`] mirrors the terminal
133/// event so the runner can feed the scheduler without re-reading the
134/// event log.
135pub fn dispatch(node: &Node, ctx: &ExecutorContext<'_>) -> Result<NodeOutcome, DispatchError> {
136    let kind = node
137        .execution_kind()
138        .ok_or_else(|| DispatchError::InvalidNode {
139            node_id: node.id.clone(),
140            reason: "node has no execution kind (validator should have caught this)".into(),
141        })?;
142
143    let event_kind = match kind {
144        ExecutionKind::Command => NodeKind::Command,
145        ExecutionKind::Prompt => NodeKind::Prompt,
146        ExecutionKind::Bash => NodeKind::Bash,
147        ExecutionKind::Loop => NodeKind::Loop,
148    };
149
150    ctx.event_log
151        .append(&Event::NodeStarted(NodeStarted {
152            id: new_event_id(),
153            ts: iso_utc_now(),
154            run_id: ctx.run_id.to_string(),
155            node_id: node.id.clone(),
156            kind: event_kind,
157            name: None,
158            model: effective_model(node, ctx).map(str::to_string),
159        }))
160        .map_err(DispatchError::from)?;
161
162    // Compute the body + gate outcome with infrastructure errors
163    // folded into a `NodeFailed{Crash}` so `node.started` is always
164    // paired with exactly one terminal event. The only residual orphan
165    // window is `emit_terminal` itself failing — at that point the
166    // event log is unwritable and no compensation is possible.
167    let final_outcome = body_with_gate(node, kind, ctx);
168    emit_terminal(node, &final_outcome, ctx)?;
169    Ok(final_outcome)
170}
171
172/// Run the node body (and gate, if any) returning a `NodeOutcome` that
173/// is safe to emit as a terminal event. Dispatch-level infrastructure
174/// errors — failed spawns, event-log writes during the body, capture
175/// I/O — are converted to `NodeFailed{Crash}` with the error message
176/// embedded, so the recorded event stream never contains a dangling
177/// `node.started`.
178fn body_with_gate(node: &Node, kind: ExecutionKind, ctx: &ExecutorContext<'_>) -> NodeOutcome {
179    let body_outcome = match kind {
180        ExecutionKind::Bash => run_bash(node, ctx),
181        ExecutionKind::Command | ExecutionKind::Prompt => run_ai(node, ctx),
182        ExecutionKind::Loop => run_loop(node, ctx),
183    };
184    let body_outcome = match body_outcome {
185        Ok(o) => o,
186        Err(e) => {
187            return NodeOutcome::Failed {
188                kind: ErrorKind::Crash,
189                message: Some(format!("dispatch error in node body: {e}")),
190            }
191        }
192    };
193    match body_outcome {
194        NodeOutcome::Completed => match node.gate.as_deref() {
195            Some(gate) => match run_gate(node, gate, ctx) {
196                Ok(o) => o,
197                Err(e) => NodeOutcome::Failed {
198                    kind: ErrorKind::GateFailed,
199                    message: Some(format!("dispatch error in gate: {e}")),
200                },
201            },
202            None => NodeOutcome::Completed,
203        },
204        other => other,
205    }
206}
207
208// ── Bash path ──────────────────────────────────────────────────────
209
210fn run_bash(node: &Node, ctx: &ExecutorContext<'_>) -> Result<NodeOutcome, DispatchError> {
211    let bash_body = node
212        .bash
213        .as_deref()
214        .ok_or_else(|| DispatchError::InvalidNode {
215            node_id: node.id.clone(),
216            reason: "bash node missing bash: body".into(),
217        })?;
218
219    let capture_path = node_capture_path(ctx, &node.id);
220    ensure_parent_dir(&capture_path)?;
221
222    let mut cmd = bash_command(bash_body);
223    cmd.current_dir(ctx.worktree)
224        .stdin(Stdio::null())
225        .stdout(Stdio::piped())
226        .stderr(Stdio::piped());
227    for input in ctx.inputs {
228        let env_key = format!("OMNE_INPUT_{}", input.key.to_uppercase());
229        cmd.env(env_key, &input.value);
230    }
231
232    let budget = node_timeout(node, ctx);
233    let outcome = run_command_with_timeout(&mut cmd, budget, Some(&capture_path))?;
234    Ok(outcome_from_exit(outcome))
235}
236
237#[cfg(windows)]
238fn bash_command(body: &str) -> Command {
239    use std::os::windows::process::CommandExt;
240    let mut cmd = Command::new("cmd");
241    // Mirror `tests/claude_proc_stub.rs`: `raw_arg` bypasses Rust's
242    // default Windows argument escaper, which would double-escape any
243    // embedded double quotes that `cmd /S /C "..."` then mis-parses.
244    // `/S` tells `cmd.exe` to strip exactly the outer pair of quotes
245    // before interpreting the rest, so the body is forwarded verbatim
246    // (preserving `"` characters inside the body).
247    //
248    // We cannot neutralise cmd's own metacharacter re-parsing (`&`,
249    // `|`, `%VAR%`) without switching to PowerShell or WSL — that is
250    // documented deferred scope. The concrete bug this mitigates is
251    // Rust-level quote mangling, which would silently corrupt bodies
252    // containing double quotes regardless of cmd behaviour.
253    cmd.raw_arg(format!("/S /C \"{body}\""));
254    cmd
255}
256
257#[cfg(not(windows))]
258fn bash_command(body: &str) -> Command {
259    let mut cmd = Command::new("sh");
260    cmd.arg("-c").arg(body);
261    cmd
262}
263
264// ── AI path (command / prompt) ────────────────────────────────────
265
266fn run_ai(node: &Node, ctx: &ExecutorContext<'_>) -> Result<NodeOutcome, DispatchError> {
267    let prompt = ai_prompt_for(node)?;
268    let capture_path = node_capture_path(ctx, &node.id);
269    ensure_parent_dir(&capture_path)?;
270
271    let opts = build_spawn_opts(node, ctx, prompt, None);
272    let budget = node_timeout(node, ctx);
273    let outcome = run_claude_iteration(&opts, &capture_path, CaptureMode::Truncate, budget, &[])?;
274
275    Ok(match outcome {
276        ClaudeOutcome::CleanExit => NodeOutcome::Completed,
277        ClaudeOutcome::SentinelHit { token: _ } => {
278            // Only registered sentinel for non-loop AI nodes is BLOCKED.
279            NodeOutcome::Failed {
280                kind: ErrorKind::Blocked,
281                message: Some("assistant emitted BLOCKED".into()),
282            }
283        }
284        other => claude_failure(other, node, budget),
285    })
286}
287
288/// Translate the non-`CleanExit`, non-`SentinelHit` arms of
289/// [`ClaudeOutcome`] into [`NodeOutcome::Failed`]. Shared by `run_ai`
290/// and `run_loop` so their error-message shapes do not drift. The
291/// sentinel-hit and clean-exit cases differ per caller (the AI path
292/// treats any hit as BLOCKED; the loop path discriminates BLOCKED vs
293/// `until`), so they stay inline at each call site.
294fn claude_failure(outcome: ClaudeOutcome, node: &Node, budget: Duration) -> NodeOutcome {
295    match outcome {
296        ClaudeOutcome::CleanExit | ClaudeOutcome::SentinelHit { .. } => {
297            unreachable!("claude_failure invoked on a non-failure outcome");
298        }
299        ClaudeOutcome::Timeout => NodeOutcome::Failed {
300            kind: ErrorKind::Timeout,
301            message: Some(format!(
302                "node {} exceeded timeout of {}s",
303                node.id,
304                budget.as_secs()
305            )),
306        },
307        ClaudeOutcome::HostMissing => NodeOutcome::Failed {
308            kind: ErrorKind::HostMissing,
309            message: Some("claude binary not found on PATH".into()),
310        },
311        ClaudeOutcome::Crash { stderr_tail } => NodeOutcome::Failed {
312            kind: ErrorKind::Crash,
313            message: Some(stderr_tail),
314        },
315    }
316}
317
318fn ai_prompt_for(node: &Node) -> Result<String, DispatchError> {
319    if let Some(prompt) = &node.prompt {
320        return Ok(prompt.clone());
321    }
322    if let Some(command) = &node.command {
323        // v1 convention: commands map to Claude Code slash commands.
324        return Ok(format!("/{command}"));
325    }
326    if node.loop_.is_some() {
327        return Err(DispatchError::InvalidNode {
328            node_id: node.id.clone(),
329            reason: "loop body should route through run_loop, not run_ai".into(),
330        });
331    }
332    Err(DispatchError::InvalidNode {
333        node_id: node.id.clone(),
334        reason: "AI node carried neither prompt: nor command:".into(),
335    })
336}
337
338// ── Loop path ────────────────────────────────────────────────────────
339
340fn run_loop(node: &Node, ctx: &ExecutorContext<'_>) -> Result<NodeOutcome, DispatchError> {
341    let body = node
342        .loop_
343        .as_ref()
344        .ok_or_else(|| DispatchError::InvalidNode {
345            node_id: node.id.clone(),
346            reason: "loop dispatch on a non-loop node".into(),
347        })?;
348
349    let prompt = loop_body_prompt(node, body)?;
350    let capture_path = node_capture_path(ctx, &node.id);
351    ensure_parent_dir(&capture_path)?;
352
353    // Pre-allocate the session UUID when the loop wants a resumed
354    // session. `claude --session-id` requires a UUID (spike-locked).
355    let session_uuid = if !body.fresh_context {
356        Some(Uuid::new_v4().to_string())
357    } else {
358        None
359    };
360
361    let until_tokens: Vec<String> = vec![body.until.clone()];
362    let budget = node_timeout(node, ctx);
363    let deadline = Instant::now() + budget;
364
365    for iter in 1..=body.max_iterations {
366        // Deadline first: do not mutate the capture file after the
367        // wall-clock budget has expired. This prevents dangling
368        // `--- iteration N ---` headers with no body below on
369        // timeout: 0 / near-exhausted-budget runs.
370        let remaining = deadline.saturating_duration_since(Instant::now());
371        if remaining.is_zero() {
372            return Ok(NodeOutcome::Failed {
373                kind: ErrorKind::Timeout,
374                message: Some(format!(
375                    "loop {} exceeded timeout of {}s",
376                    node.id,
377                    budget.as_secs()
378                )),
379            });
380        }
381
382        // Iter 1 truncates any stale capture from a prior dispatch of
383        // this node; subsequent iterations append below the iter-1
384        // marker. StreamParser then opens the file with Append so its
385        // writes land after our header.
386        if iter == 1 {
387            truncate_capture(&capture_path)?;
388        }
389        let byte_offset = write_iteration_marker(&capture_path, iter)?;
390        ctx.event_log
391            .append(&Event::IterationStarted(IterationStarted {
392                id: new_event_id(),
393                ts: iso_utc_now(),
394                run_id: ctx.run_id.to_string(),
395                node_id: node.id.clone(),
396                iteration: iter,
397                byte_offset,
398            }))
399            .map_err(DispatchError::from)?;
400
401        let session = session_uuid.as_ref().map(|uuid| {
402            if iter == 1 {
403                Session::New(uuid.clone())
404            } else {
405                Session::Resume(uuid.clone())
406            }
407        });
408
409        let opts = build_spawn_opts(node, ctx, prompt.clone(), session);
410
411        let outcome = run_claude_iteration(
412            &opts,
413            &capture_path,
414            CaptureMode::Append,
415            remaining,
416            &until_tokens,
417        )?;
418
419        match outcome {
420            ClaudeOutcome::SentinelHit { token } => {
421                // BLOCKED always takes priority over the user's
422                // `until` token — even if a malformed pipe somehow
423                // slipped `until: BLOCKED` past the validator.
424                if token == sentinel::BLOCKED {
425                    return Ok(NodeOutcome::Failed {
426                        kind: ErrorKind::Blocked,
427                        message: Some(format!(
428                            "loop {} assistant emitted BLOCKED on iteration {iter}",
429                            node.id
430                        )),
431                    });
432                }
433                if token == body.until {
434                    return Ok(NodeOutcome::Completed);
435                }
436                // Scanner surfaces only BLOCKED or the registered
437                // `until` token; no other token can reach here.
438                return Err(DispatchError::InvalidNode {
439                    node_id: node.id.clone(),
440                    reason: format!("scanner returned unexpected token `{token}`"),
441                });
442            }
443            ClaudeOutcome::CleanExit => {
444                // Iteration ended without firing the `until` sentinel.
445                // Continue to the next iteration.
446            }
447            failure => return Ok(claude_failure(failure, node, budget)),
448        }
449    }
450
451    Ok(NodeOutcome::Failed {
452        kind: ErrorKind::MaxIterationsExceeded,
453        message: Some(format!(
454            "loop {} exhausted {} iterations without matching `{}`",
455            node.id, body.max_iterations, body.until
456        )),
457    })
458}
459
460fn loop_body_prompt(node: &Node, body: &LoopBody) -> Result<String, DispatchError> {
461    if let Some(prompt) = &body.prompt {
462        return Ok(prompt.clone());
463    }
464    if let Some(command) = &body.command {
465        return Ok(format!("/{command}"));
466    }
467    Err(DispatchError::InvalidNode {
468        node_id: node.id.clone(),
469        reason: "loop body carries neither prompt: nor command:".into(),
470    })
471}
472
473/// Namespaced, machine-parseable iteration marker line. The `omne:`
474/// prefix plus the trailing sentinel `===` make a collision with normal
475/// assistant prose vanishingly unlikely (unlike the previous
476/// `--- iteration N ---` form, which clashes with Markdown setext
477/// headings and `git diff` hunk headers). Agents that still prefer
478/// structured boundaries should read `iteration.started` from the event
479/// log, which also carries the byte offset where each iteration's
480/// content begins.
481const ITERATION_MARKER_PREFIX: &str = "=== omne:iteration:";
482
483/// Append an iteration marker line to `capture_path` and return the
484/// byte offset at which the iteration's assistant output will start —
485/// i.e. the file size immediately after the marker line is written.
486/// Agents slice `[byte_offset_N .. byte_offset_{N+1})` to reconstruct a
487/// single iteration's text without having to re-parse markers.
488fn write_iteration_marker(capture_path: &Path, iter: u32) -> Result<u64, DispatchError> {
489    let mut f = std::fs::OpenOptions::new()
490        .create(true)
491        .append(true)
492        .open(capture_path)
493        .map_err(|source| DispatchError::Io {
494            path: capture_path.to_path_buf(),
495            source,
496        })?;
497    writeln!(f, "\n{ITERATION_MARKER_PREFIX}{iter} ===").map_err(|source| DispatchError::Io {
498        path: capture_path.to_path_buf(),
499        source,
500    })?;
501    f.sync_data().map_err(|source| DispatchError::Io {
502        path: capture_path.to_path_buf(),
503        source,
504    })?;
505    let meta = std::fs::metadata(capture_path).map_err(|source| DispatchError::Io {
506        path: capture_path.to_path_buf(),
507        source,
508    })?;
509    Ok(meta.len())
510}
511
512/// Truncate `capture_path` to zero bytes, creating it if absent. Used
513/// by loop iteration 1 so a re-dispatch of a failed loop against the
514/// same `run_id` starts from a clean capture file rather than stacking
515/// on top of prior-run output.
516fn truncate_capture(capture_path: &Path) -> Result<(), DispatchError> {
517    std::fs::OpenOptions::new()
518        .create(true)
519        .write(true)
520        .truncate(true)
521        .open(capture_path)
522        .map(|_| ())
523        .map_err(|source| DispatchError::Io {
524            path: capture_path.to_path_buf(),
525            source,
526        })
527}
528
529// ── Claude iteration runner (shared by AI + loop) ────────────────
530
531/// Outcome of one `claude -p` subprocess invocation as interpreted by
532/// the executor — the scanner hit, a timeout, a clean exit, or a crash.
533enum ClaudeOutcome {
534    CleanExit,
535    SentinelHit { token: String },
536    Timeout,
537    HostMissing,
538    Crash { stderr_tail: String },
539}
540
541/// Run one `claude -p` invocation and interpret the stream line-by-line.
542///
543/// A background watchdog thread kills the child when `budget` elapses;
544/// the main thread drains the parser, feeds each line to a
545/// [`Scanner`], and returns as soon as a sentinel hits (or the stream
546/// closes). `capture_mode` lets the loop controller preserve prior
547/// iterations' text with [`CaptureMode::Append`].
548fn run_claude_iteration(
549    opts: &SpawnOpts,
550    capture_path: &Path,
551    capture_mode: CaptureMode,
552    budget: Duration,
553    until_tokens: &[String],
554) -> Result<ClaudeOutcome, DispatchError> {
555    let child = match claude_proc::spawn(opts) {
556        Ok(c) => c,
557        Err(claude_proc::Error::HostMissing) => return Ok(ClaudeOutcome::HostMissing),
558        Err(other) => return Err(DispatchError::from(other)),
559    };
560
561    let proc = ClaudeProcess::from_child_with_mode(child, capture_path, capture_mode)?;
562    let killer = proc.killer();
563    let (cancel_tx, cancel_rx) = mpsc::channel::<()>();
564    let watchdog = spawn_watchdog(killer.clone(), cancel_rx, budget);
565
566    let scanner = Scanner::new(until_tokens);
567    let mut proc = proc;
568    let mut hit: Option<String> = None;
569    let mut stream_error: Option<claude_proc::Error> = None;
570    for line in proc.by_ref() {
571        match line {
572            Ok(al) => {
573                if let Some(h) = scanner.feed(&al.text) {
574                    hit = Some(h.token);
575                    // Sentinel short-circuit: kill the child so the
576                    // stream does not keep reading. Both `BLOCKED`
577                    // (always reserved) and user `until` tokens end
578                    // the iteration here.
579                    let _ = killer.kill();
580                    break;
581                }
582            }
583            Err(e) => {
584                stream_error = Some(e);
585                break;
586            }
587        }
588    }
589
590    // Signal the watchdog before waiting for the child so it doesn't
591    // kill a naturally-finished process after the fact. Dropping the
592    // sender signals "cancelled" via RecvTimeoutError::Disconnected.
593    drop(cancel_tx);
594    let timed_out = watchdog.join().unwrap_or(false);
595
596    let (status, stderr) = match proc.finish() {
597        Ok(s) => s,
598        Err(e) => {
599            return Err(DispatchError::from(e));
600        }
601    };
602
603    if let Some(token) = hit {
604        return Ok(ClaudeOutcome::SentinelHit { token });
605    }
606    if timed_out {
607        return Ok(ClaudeOutcome::Timeout);
608    }
609    if let Some(e) = stream_error {
610        // Stream-level I/O errors (malformed stdin we couldn't parse)
611        // are rare — surface as a crash so the runner can record the
612        // stderr tail alongside.
613        return Ok(ClaudeOutcome::Crash {
614            stderr_tail: format!("stream error: {e}\nstderr: {}", tail(&stderr, 1024)),
615        });
616    }
617    if !status.success() {
618        return Ok(ClaudeOutcome::Crash {
619            stderr_tail: tail(&stderr, 1024),
620        });
621    }
622    Ok(ClaudeOutcome::CleanExit)
623}
624
625/// Spawn a thread that kills `killer`'s child after `budget`, unless
626/// the main thread signals cancellation first.
627///
628/// Cancellation is signalled by dropping the `cancel_tx` side of the
629/// channel (which surfaces as `RecvTimeoutError::Disconnected` on the
630/// rx side). The channel is never `.send()`-used today — a value
631/// arriving via `Ok(_)` would also mean "cancelled; don't kill", but
632/// no caller exercises that path so we treat it as unreachable to keep
633/// the intent of the protocol explicit. Returns `true` once joined iff
634/// the watchdog actually killed the child.
635fn spawn_watchdog(
636    killer: ChildKiller,
637    rx: mpsc::Receiver<()>,
638    budget: Duration,
639) -> thread::JoinHandle<bool> {
640    thread::spawn(move || match rx.recv_timeout(budget) {
641        Err(mpsc::RecvTimeoutError::Disconnected) => false,
642        Err(mpsc::RecvTimeoutError::Timeout) => {
643            let _ = killer.kill();
644            true
645        }
646        Ok(()) => {
647            // No caller currently sends on this channel; cancellation
648            // is by drop. Reaching this arm means a future refactor
649            // added an explicit cancel path without updating this
650            // comment — treat as cancellation.
651            false
652        }
653    })
654}
655
656fn build_spawn_opts(
657    node: &Node,
658    ctx: &ExecutorContext<'_>,
659    prompt: String,
660    session: Option<Session>,
661) -> SpawnOpts {
662    SpawnOpts {
663        prompt,
664        cwd: ctx.worktree.to_path_buf(),
665        model: effective_model(node, ctx).map(str::to_string),
666        allowed_tools: node.allowed_tools.clone(),
667        session,
668        extra_args: Vec::new(),
669        bin: ctx.claude_bin.map(|p| p.to_path_buf()),
670    }
671}
672
673fn effective_model<'a>(node: &'a Node, ctx: &'a ExecutorContext<'a>) -> Option<&'a str> {
674    node.model
675        .as_deref()
676        .or_else(|| ctx.default_model.map(|s| s as &str))
677}
678
679// ── Bash / gate shared subprocess runner ────────────────────────
680
681/// Outcome of a subprocess we do not stream-parse — bash and gate hooks
682/// both use this path.
683struct RawExit {
684    status: std::process::ExitStatus,
685    stdout: Vec<u8>,
686    stderr: Vec<u8>,
687    timed_out: bool,
688}
689
690/// Ask `cmd` to spawn its child into a fresh process group (Unix) or
691/// process-console group (Windows). Enables [`kill_process_tree`] to
692/// target the whole subtree when a timeout fires.
693#[cfg(unix)]
694fn set_new_process_group(cmd: &mut Command) {
695    use std::os::unix::process::CommandExt;
696    // `0` means "make the child the leader of a new process group
697    // whose id equals its pid". Then `kill -KILL -$pid` (negative pid)
698    // signals every member of that group.
699    cmd.process_group(0);
700}
701
702#[cfg(windows)]
703fn set_new_process_group(cmd: &mut Command) {
704    use std::os::windows::process::CommandExt;
705    // CREATE_NEW_PROCESS_GROUP = 0x00000200 — Win32 process creation
706    // flag that dissociates the child from our console group so
707    // `taskkill /T` walks the right tree.
708    const CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
709    cmd.creation_flags(CREATE_NEW_PROCESS_GROUP);
710}
711
712/// Send a terminal signal to every process in `pid`'s group (Unix) or
713/// every process in the tree rooted at `pid` (Windows). Used when the
714/// wall-clock budget expires so backgrounded grandchildren cannot keep
715/// the drained pipe handles open past the timeout.
716///
717/// Deliberately delegated to the platform's canonical tree-kill
718/// utility (`kill`, `taskkill`) rather than a raw syscall: the utility
719/// already handles the edge cases (zombies, permission errors, missing
720/// targets) that a minimal `libc::killpg` or `OpenProcess` wrapper
721/// would re-invent.
722#[cfg(unix)]
723fn kill_process_tree(pid: u32) {
724    let _ = Command::new("kill")
725        .args(["-KILL", "--", &format!("-{pid}")])
726        .stdin(Stdio::null())
727        .stdout(Stdio::null())
728        .stderr(Stdio::null())
729        .status();
730}
731
732#[cfg(windows)]
733fn kill_process_tree(pid: u32) {
734    let _ = Command::new("taskkill")
735        .args(["/T", "/F", "/PID", &pid.to_string()])
736        .stdin(Stdio::null())
737        .stdout(Stdio::null())
738        .stderr(Stdio::null())
739        .status();
740}
741
742fn run_command_with_timeout(
743    cmd: &mut Command,
744    budget: Duration,
745    capture_stdout_at: Option<&Path>,
746) -> Result<RawExit, DispatchError> {
747    // Place the child in its own process group / console group so a
748    // timeout kill reaches backgrounded grandchildren too. Without
749    // this, a bash body that spawns a background subprocess (`& sleep
750    // 60` on Unix, or any native `start` / pipeline spawn on Windows)
751    // inherits our stdout/stderr pipes; killing only the direct child
752    // leaves grandchildren holding the pipe handles open and
753    // `stdout_thread.join()` blocks until those grandchildren die
754    // naturally — potentially forever.
755    set_new_process_group(cmd);
756    let mut child = cmd
757        .spawn()
758        .map_err(|source| DispatchError::Spawn { source })?;
759    let child_pid = child.id();
760
761    // Drain stdout + stderr on dedicated threads so buffer-full
762    // deadlocks never stall a long-running child, same pattern as
763    // `ClaudeProcess`.
764    let stdout_handle = child.stdout.take();
765    let stderr_handle = child.stderr.take();
766    let stdout_thread = stdout_handle.map(|mut s| {
767        thread::spawn(move || -> std::io::Result<Vec<u8>> {
768            let mut buf = Vec::new();
769            s.read_to_end(&mut buf)?;
770            Ok(buf)
771        })
772    });
773    let stderr_thread = stderr_handle.map(|mut s| {
774        thread::spawn(move || -> std::io::Result<Vec<u8>> {
775            let mut buf = Vec::new();
776            s.read_to_end(&mut buf)?;
777            Ok(buf)
778        })
779    });
780
781    use wait_timeout::ChildExt;
782    let (status, timed_out) = match child
783        .wait_timeout(budget)
784        .map_err(|source| DispatchError::Wait { source })?
785    {
786        Some(s) => (s, false),
787        None => {
788            // Tree-kill first so grandchildren release the pipe
789            // handles that would otherwise block `stdout_thread.join`
790            // forever. `child.kill()` is a belt-and-suspenders safety
791            // net for the direct child on platforms where the
792            // tree-kill utility is missing.
793            kill_process_tree(child_pid);
794            let _ = child.kill();
795            let s = child
796                .wait()
797                .map_err(|source| DispatchError::Wait { source })?;
798            (s, true)
799        }
800    };
801
802    let stdout = stdout_thread
803        .map(|h| {
804            h.join()
805                .unwrap_or_else(|_| Ok(Vec::new()))
806                .unwrap_or_default()
807        })
808        .unwrap_or_default();
809    let stderr = stderr_thread
810        .map(|h| {
811            h.join()
812                .unwrap_or_else(|_| Ok(Vec::new()))
813                .unwrap_or_default()
814        })
815        .unwrap_or_default();
816
817    if let Some(cap) = capture_stdout_at {
818        std::fs::write(cap, &stdout).map_err(|source| DispatchError::Io {
819            path: cap.to_path_buf(),
820            source,
821        })?;
822    }
823    Ok(RawExit {
824        status,
825        stdout,
826        stderr,
827        timed_out,
828    })
829}
830
831fn outcome_from_exit(raw: RawExit) -> NodeOutcome {
832    if raw.timed_out {
833        return NodeOutcome::Failed {
834            kind: ErrorKind::Timeout,
835            message: Some(format!(
836                "subprocess killed after exceeding wall-clock budget; stderr: {}",
837                String::from_utf8_lossy(&tail_bytes(&raw.stderr, 1024)).into_owned()
838            )),
839        };
840    }
841    if raw.status.success() {
842        return NodeOutcome::Completed;
843    }
844    NodeOutcome::Failed {
845        kind: ErrorKind::Crash,
846        message: Some(
847            String::from_utf8_lossy(&tail_bytes(&raw.stderr, 1024))
848                .trim()
849                .to_string(),
850        ),
851    }
852}
853
854// ── Gate path ────────────────────────────────────────────────────
855
856fn run_gate(
857    node: &Node,
858    gate: &str,
859    ctx: &ExecutorContext<'_>,
860) -> Result<NodeOutcome, DispatchError> {
861    let script = gate_script_path(ctx.volume_root, gate);
862    // Validator checked existence at load time, but the file could
863    // have been deleted in the interval. Recheck so a missing hook
864    // surfaces as `NodeFailed{GateFailed}` rather than a spawn-time
865    // `DispatchError` the caller has to special-case.
866    if !script.is_file() {
867        return Ok(NodeOutcome::Failed {
868            kind: ErrorKind::GateFailed,
869            message: Some(format!(
870                "gate hook {} missing at dispatch time",
871                script.display()
872            )),
873        });
874    }
875    // Defence-in-depth path traversal check. The validator already
876    // rejects gate names with `/`, `\`, or `..`, but a DAG-earlier
877    // bash node could plant a file at a traversal target after
878    // validation ran — we rebuild the canonical path here and assert
879    // it's still under `dist/hooks/`.
880    if let Some(outcome) = enforce_gate_boundary(&script, ctx.volume_root, gate)? {
881        return Ok(outcome);
882    }
883    // Gate runs from the volume root, not the per-run worktree. Hooks
884    // are distro-level scripts that may need to see the whole volume
885    // layout (e.g. `.omne/lib/cfg/`), so placing them in the worktree
886    // would hide paths that only exist at the root.
887    let mut cmd = gate_command(&script);
888    cmd.current_dir(ctx.volume_root)
889        .stdin(Stdio::null())
890        .stdout(Stdio::piped())
891        .stderr(Stdio::piped())
892        .env("OMNE_RUN_ID", ctx.run_id)
893        .env("OMNE_NODE_ID", &node.id)
894        .env("OMNE_GATE_NAME", gate)
895        .env("OMNE_VOLUME_ROOT", ctx.volume_root);
896
897    let raw = run_command_with_timeout(&mut cmd, ctx.gate_timeout, None)?;
898    if raw.timed_out {
899        return Ok(NodeOutcome::Failed {
900            kind: ErrorKind::GateTimeout,
901            message: Some(format!(
902                "gate {gate} exceeded {}s budget",
903                ctx.gate_timeout.as_secs()
904            )),
905        });
906    }
907    if !raw.status.success() {
908        return Ok(NodeOutcome::Failed {
909            kind: ErrorKind::GateFailed,
910            message: Some(
911                String::from_utf8_lossy(&tail_bytes(&raw.stderr, 1024))
912                    .trim()
913                    .to_string(),
914            ),
915        });
916    }
917
918    let stdout_tail = if raw.stdout.is_empty() {
919        None
920    } else {
921        let trimmed = String::from_utf8_lossy(&tail_bytes(&raw.stdout, 1024))
922            .trim()
923            .to_string();
924        (!trimmed.is_empty()).then_some(trimmed)
925    };
926
927    ctx.event_log
928        .append(&Event::GatePassed(GatePassed {
929            id: new_event_id(),
930            ts: iso_utc_now(),
931            run_id: ctx.run_id.to_string(),
932            node_id: node.id.clone(),
933            gate: gate.to_string(),
934            method: GateMethod::Hook,
935            stdout: stdout_tail,
936        }))
937        .map_err(DispatchError::from)?;
938    Ok(NodeOutcome::Completed)
939}
940
941/// Assert that `script` (known to exist at this point) resolves inside
942/// `<volume_root>/.omne/dist/hooks/`. Returns `Some(NodeFailed)` when
943/// the resolved path escapes the hooks directory, so the caller can
944/// record a clean `gate.failed` terminal event instead of propagating
945/// a `DispatchError`. Returns `None` when the path is in bounds.
946///
947/// Both paths are `canonicalize`d so symlinks that leave the hooks dir
948/// are also rejected. The check is best-effort: if the hooks directory
949/// itself is missing (distro not installed) we return `None` and let
950/// the subsequent spawn fail with a standard error.
951fn enforce_gate_boundary(
952    script: &Path,
953    volume_root: &Path,
954    gate: &str,
955) -> Result<Option<NodeOutcome>, DispatchError> {
956    let hooks_dir = volume::dist_dir(volume_root).join("hooks");
957    let canonical_hooks = match hooks_dir.canonicalize() {
958        Ok(p) => p,
959        Err(_) => return Ok(None),
960    };
961    let canonical_script = script.canonicalize().map_err(|source| DispatchError::Io {
962        path: script.to_path_buf(),
963        source,
964    })?;
965    if !canonical_script.starts_with(&canonical_hooks) {
966        return Ok(Some(NodeOutcome::Failed {
967            kind: ErrorKind::GateFailed,
968            message: Some(format!(
969                "gate `{gate}` resolves outside {}: {}",
970                canonical_hooks.display(),
971                canonical_script.display()
972            )),
973        }));
974    }
975    Ok(None)
976}
977
978fn gate_script_path(volume_root: &Path, gate: &str) -> PathBuf {
979    volume::dist_dir(volume_root)
980        .join("hooks")
981        .join(format!("{gate}.{}", platform_hook_extension()))
982}
983
984#[cfg(windows)]
985fn gate_command(script: &Path) -> Command {
986    let mut cmd = Command::new("powershell");
987    // Intentionally no `-ExecutionPolicy Bypass`: overriding the user's
988    // AllSigned / RemoteSigned policy silently removed the OS signing
989    // check for every gate hook. Distro authors relying on unsigned
990    // hooks should document the `Unblock-File` step in their install
991    // guide rather than have the runner bypass the policy.
992    cmd.arg("-NoProfile").arg("-File").arg(script);
993    cmd
994}
995
996#[cfg(not(windows))]
997fn gate_command(script: &Path) -> Command {
998    let mut cmd = Command::new("sh");
999    cmd.arg(script);
1000    cmd
1001}
1002
1003#[cfg(windows)]
1004fn platform_hook_extension() -> &'static str {
1005    "ps1"
1006}
1007
1008#[cfg(not(windows))]
1009fn platform_hook_extension() -> &'static str {
1010    "sh"
1011}
1012
1013// ── Terminal event emission ─────────────────────────────────────
1014
1015fn emit_terminal(
1016    node: &Node,
1017    outcome: &NodeOutcome,
1018    ctx: &ExecutorContext<'_>,
1019) -> Result<(), DispatchError> {
1020    let event = match outcome {
1021        NodeOutcome::Completed => Event::NodeCompleted(NodeCompleted {
1022            id: new_event_id(),
1023            ts: iso_utc_now(),
1024            run_id: ctx.run_id.to_string(),
1025            node_id: node.id.clone(),
1026            output_path: capture_output_path_wire(ctx.run_id, &node.id),
1027        }),
1028        NodeOutcome::Failed { kind, message } => Event::NodeFailed(NodeFailed {
1029            id: new_event_id(),
1030            ts: iso_utc_now(),
1031            run_id: ctx.run_id.to_string(),
1032            node_id: node.id.clone(),
1033            error: NodeError { kind: *kind },
1034            message: message.clone(),
1035        }),
1036    };
1037    ctx.event_log.append(&event).map_err(DispatchError::from)?;
1038    Ok(())
1039}
1040
1041// ── Helpers ──────────────────────────────────────────────────────
1042
1043fn node_capture_path(ctx: &ExecutorContext<'_>, node_id: &str) -> PathBuf {
1044    volume::nodes_dir(ctx.volume_root, ctx.run_id).join(format!("{node_id}.out"))
1045}
1046
1047/// Forward-slash, volume-root-relative path for the wire `output_path`
1048/// field on `node.completed`. Single source of truth lives in
1049/// [`volume::node_capture_wire_path`] so a layout change in `volume.rs`
1050/// propagates here automatically.
1051fn capture_output_path_wire(run_id: &str, node_id: &str) -> String {
1052    volume::node_capture_wire_path(run_id, node_id)
1053}
1054
1055fn ensure_parent_dir(path: &Path) -> Result<(), DispatchError> {
1056    if let Some(parent) = path.parent() {
1057        if !parent.as_os_str().is_empty() {
1058            std::fs::create_dir_all(parent).map_err(|source| DispatchError::Io {
1059                path: parent.to_path_buf(),
1060                source,
1061            })?;
1062        }
1063    }
1064    Ok(())
1065}
1066
1067fn node_timeout(node: &Node, ctx: &ExecutorContext<'_>) -> Duration {
1068    node.timeout
1069        .map(Duration::from_secs)
1070        .unwrap_or(ctx.default_node_timeout)
1071}
1072
1073fn new_event_id() -> String {
1074    Ulid::new().to_string().to_lowercase()
1075}
1076
1077/// Current UTC instant as `YYYY-MM-DDTHH:MM:SSZ`. Delegates to
1078/// [`crate::clock`] so `init::chrono_today` and `events.jsonl`
1079/// timestamps share a single civil-calendar implementation.
1080fn iso_utc_now() -> String {
1081    crate::clock::now_utc().format_iso_utc()
1082}
1083
1084/// Byte-tail of `b` at most `max_bytes` long, aligned forward to the
1085/// next valid UTF-8 start boundary. Aligning inside the helper means
1086/// a caller that later decodes the slice as UTF-8 (via
1087/// `from_utf8_lossy`) never ends up with a leading U+FFFD replacement
1088/// character from a split multi-byte codepoint.
1089fn tail_bytes(b: &[u8], max_bytes: usize) -> Vec<u8> {
1090    if b.len() <= max_bytes {
1091        return b.to_vec();
1092    }
1093    let mut start = b.len() - max_bytes;
1094    // UTF-8 continuation bytes are `10xxxxxx` (0x80..=0xBF). Skip
1095    // forward until we reach a lead byte (or ASCII) so decoding
1096    // doesn't split a codepoint.
1097    while start < b.len() && (b[start] & 0b1100_0000) == 0b1000_0000 {
1098        start += 1;
1099    }
1100    b[start..].to_vec()
1101}
1102
1103/// UTF-8-safe trimmed tail of `s`. Built on top of [`tail_bytes`] so
1104/// the boundary-alignment logic lives in one place.
1105fn tail(s: &str, max_bytes: usize) -> String {
1106    let bytes = tail_bytes(s.as_bytes(), max_bytes);
1107    String::from_utf8_lossy(&bytes).trim().to_string()
1108}
1109
1110// ── Errors ──────────────────────────────────────────────────────
1111
1112/// Failure categories returned from [`dispatch`] itself (distinct from
1113/// `NodeOutcome::Failed`, which is a *recorded* node failure). These
1114/// are infrastructure errors that prevent the executor from even
1115/// deciding the node's outcome — event-log I/O, malformed pipe data
1116/// leaking past validation, stream-parser crashes, etc.
1117#[derive(Debug, thiserror::Error)]
1118pub enum DispatchError {
1119    #[error("node `{node_id}` rejected by executor: {reason}")]
1120    InvalidNode { node_id: String, reason: String },
1121
1122    #[error("event log error: {0}")]
1123    EventLog(#[from] crate::event_log::Error),
1124
1125    #[error("claude subprocess error: {0}")]
1126    ClaudeProc(#[from] claude_proc::Error),
1127
1128    /// Path-bearing I/O error (capture-file writes, iteration-marker
1129    /// writes, `mkdir_p`). Spawn and wait failures use [`Spawn`] /
1130    /// [`Wait`] instead so the error message does not render as
1131    /// `I/O error on : ...` with an empty path.
1132    #[error("I/O error on {}: {source}", path.display())]
1133    Io {
1134        path: PathBuf,
1135        #[source]
1136        source: std::io::Error,
1137    },
1138
1139    /// `Command::spawn` failed before the child ever started. The
1140    /// `Command` itself is lost (spawn consumed it by reference but we
1141    /// have no useful identifier beyond the originating subprocess
1142    /// family, which the caller can infer from context).
1143    #[error("failed to spawn subprocess: {source}")]
1144    Spawn {
1145        #[source]
1146        source: std::io::Error,
1147    },
1148
1149    /// `wait_timeout` / `wait` on a live child reported an I/O error
1150    /// (EINTR left unhandled, process reaping race). Distinct from
1151    /// [`Spawn`] so telemetry can distinguish start-time failures from
1152    /// mid-flight failures.
1153    #[error("failed to wait on subprocess: {source}")]
1154    Wait {
1155        #[source]
1156        source: std::io::Error,
1157    },
1158}
1159
1160#[cfg(test)]
1161mod tests {
1162    use super::*;
1163
1164    #[test]
1165    fn iso_utc_now_is_plausible_shape() {
1166        let s = iso_utc_now();
1167        assert_eq!(s.len(), 20);
1168        assert!(s.ends_with('Z'));
1169        assert!(s.contains('T'));
1170    }
1171
1172    #[test]
1173    fn tail_trims_and_bounds() {
1174        assert_eq!(tail("   hi  ", 1024), "hi");
1175        let long: String = "x".repeat(5000);
1176        assert_eq!(tail(&long, 10).len(), 10);
1177    }
1178
1179    #[test]
1180    fn output_path_wire_uses_forward_slashes() {
1181        let wire = capture_output_path_wire("feature-01abc", "research");
1182        assert!(!wire.contains('\\'));
1183        assert_eq!(wire, ".omne/var/runs/feature-01abc/nodes/research.out");
1184    }
1185}