1#![allow(dead_code)]
25
26use std::io::{Read, Write};
27use std::path::{Path, PathBuf};
28use std::process::{Command, Stdio};
29use std::sync::mpsc;
30use std::thread;
31use std::time::{Duration, Instant};
32
33use ulid::Ulid;
34use uuid::Uuid;
35
36use crate::claude_proc::{self, CaptureMode, ChildKiller, ClaudeProcess, Session, SpawnOpts};
37use crate::event_log::EventLog;
38use crate::events::{
39 ErrorKind, Event, GateMethod, GatePassed, Input, IterationStarted, NodeCompleted, NodeError,
40 NodeFailed, NodeKind, NodeStarted,
41};
42use crate::pipe::{ExecutionKind, LoopBody, Node};
43use crate::sentinel::{self, Scanner};
44use crate::volume;
45
46pub const DEFAULT_NODE_TIMEOUT: Duration = Duration::from_secs(1800);
50
51pub const GATE_TIMEOUT: Duration = Duration::from_secs(60);
53
54pub struct ExecutorContext<'a> {
59 pub volume_root: &'a Path,
61 pub run_id: &'a str,
63 pub worktree: &'a Path,
66 pub event_log: &'a EventLog,
69 pub inputs: &'a [Input],
74 pub default_model: Option<&'a str>,
77 pub claude_bin: Option<&'a Path>,
79 pub default_node_timeout: Duration,
82 pub gate_timeout: Duration,
87}
88
89impl<'a> ExecutorContext<'a> {
90 pub fn new(
94 volume_root: &'a Path,
95 run_id: &'a str,
96 worktree: &'a Path,
97 event_log: &'a EventLog,
98 inputs: &'a [Input],
99 ) -> Self {
100 Self {
101 volume_root,
102 run_id,
103 worktree,
104 event_log,
105 inputs,
106 default_model: None,
107 claude_bin: None,
108 default_node_timeout: DEFAULT_NODE_TIMEOUT,
109 gate_timeout: GATE_TIMEOUT,
110 }
111 }
112}
113
114#[derive(Debug, Clone, Eq, PartialEq)]
120pub enum NodeOutcome {
121 Completed,
122 Failed {
123 kind: ErrorKind,
124 message: Option<String>,
125 },
126}
127
128pub fn dispatch(node: &Node, ctx: &ExecutorContext<'_>) -> Result<NodeOutcome, DispatchError> {
136 let kind = node
137 .execution_kind()
138 .ok_or_else(|| DispatchError::InvalidNode {
139 node_id: node.id.clone(),
140 reason: "node has no execution kind (validator should have caught this)".into(),
141 })?;
142
143 let event_kind = match kind {
144 ExecutionKind::Command => NodeKind::Command,
145 ExecutionKind::Prompt => NodeKind::Prompt,
146 ExecutionKind::Bash => NodeKind::Bash,
147 ExecutionKind::Loop => NodeKind::Loop,
148 };
149
150 ctx.event_log
151 .append(&Event::NodeStarted(NodeStarted {
152 id: new_event_id(),
153 ts: iso_utc_now(),
154 run_id: ctx.run_id.to_string(),
155 node_id: node.id.clone(),
156 kind: event_kind,
157 name: None,
158 model: effective_model(node, ctx).map(str::to_string),
159 }))
160 .map_err(DispatchError::from)?;
161
162 let final_outcome = body_with_gate(node, kind, ctx);
168 emit_terminal(node, &final_outcome, ctx)?;
169 Ok(final_outcome)
170}
171
172fn body_with_gate(node: &Node, kind: ExecutionKind, ctx: &ExecutorContext<'_>) -> NodeOutcome {
179 let body_outcome = match kind {
180 ExecutionKind::Bash => run_bash(node, ctx),
181 ExecutionKind::Command | ExecutionKind::Prompt => run_ai(node, ctx),
182 ExecutionKind::Loop => run_loop(node, ctx),
183 };
184 let body_outcome = match body_outcome {
185 Ok(o) => o,
186 Err(e) => {
187 return NodeOutcome::Failed {
188 kind: ErrorKind::Crash,
189 message: Some(format!("dispatch error in node body: {e}")),
190 }
191 }
192 };
193 match body_outcome {
194 NodeOutcome::Completed => match node.gate.as_deref() {
195 Some(gate) => match run_gate(node, gate, ctx) {
196 Ok(o) => o,
197 Err(e) => NodeOutcome::Failed {
198 kind: ErrorKind::GateFailed,
199 message: Some(format!("dispatch error in gate: {e}")),
200 },
201 },
202 None => NodeOutcome::Completed,
203 },
204 other => other,
205 }
206}
207
208fn run_bash(node: &Node, ctx: &ExecutorContext<'_>) -> Result<NodeOutcome, DispatchError> {
211 let bash_body = node
212 .bash
213 .as_deref()
214 .ok_or_else(|| DispatchError::InvalidNode {
215 node_id: node.id.clone(),
216 reason: "bash node missing bash: body".into(),
217 })?;
218
219 let capture_path = node_capture_path(ctx, &node.id);
220 ensure_parent_dir(&capture_path)?;
221
222 let mut cmd = bash_command(bash_body);
223 cmd.current_dir(ctx.worktree)
224 .stdin(Stdio::null())
225 .stdout(Stdio::piped())
226 .stderr(Stdio::piped());
227 for input in ctx.inputs {
228 let env_key = format!("OMNE_INPUT_{}", input.key.to_uppercase());
229 cmd.env(env_key, &input.value);
230 }
231
232 let budget = node_timeout(node, ctx);
233 let outcome = run_command_with_timeout(&mut cmd, budget, Some(&capture_path))?;
234 Ok(outcome_from_exit(outcome))
235}
236
237#[cfg(windows)]
238fn bash_command(body: &str) -> Command {
239 use std::os::windows::process::CommandExt;
240 let mut cmd = Command::new("cmd");
241 cmd.raw_arg(format!("/S /C \"{body}\""));
254 cmd
255}
256
257#[cfg(not(windows))]
258fn bash_command(body: &str) -> Command {
259 let mut cmd = Command::new("sh");
260 cmd.arg("-c").arg(body);
261 cmd
262}
263
264fn run_ai(node: &Node, ctx: &ExecutorContext<'_>) -> Result<NodeOutcome, DispatchError> {
267 let prompt = ai_prompt_for(node)?;
268 let capture_path = node_capture_path(ctx, &node.id);
269 ensure_parent_dir(&capture_path)?;
270
271 let opts = build_spawn_opts(node, ctx, prompt, None);
272 let budget = node_timeout(node, ctx);
273 let outcome = run_claude_iteration(&opts, &capture_path, CaptureMode::Truncate, budget, &[])?;
274
275 Ok(match outcome {
276 ClaudeOutcome::CleanExit => NodeOutcome::Completed,
277 ClaudeOutcome::SentinelHit { token: _ } => {
278 NodeOutcome::Failed {
280 kind: ErrorKind::Blocked,
281 message: Some("assistant emitted BLOCKED".into()),
282 }
283 }
284 other => claude_failure(other, node, budget),
285 })
286}
287
288fn claude_failure(outcome: ClaudeOutcome, node: &Node, budget: Duration) -> NodeOutcome {
295 match outcome {
296 ClaudeOutcome::CleanExit | ClaudeOutcome::SentinelHit { .. } => {
297 unreachable!("claude_failure invoked on a non-failure outcome");
298 }
299 ClaudeOutcome::Timeout => NodeOutcome::Failed {
300 kind: ErrorKind::Timeout,
301 message: Some(format!(
302 "node {} exceeded timeout of {}s",
303 node.id,
304 budget.as_secs()
305 )),
306 },
307 ClaudeOutcome::HostMissing => NodeOutcome::Failed {
308 kind: ErrorKind::HostMissing,
309 message: Some("claude binary not found on PATH".into()),
310 },
311 ClaudeOutcome::Crash { stderr_tail } => NodeOutcome::Failed {
312 kind: ErrorKind::Crash,
313 message: Some(stderr_tail),
314 },
315 }
316}
317
318fn ai_prompt_for(node: &Node) -> Result<String, DispatchError> {
319 if let Some(prompt) = &node.prompt {
320 return Ok(prompt.clone());
321 }
322 if let Some(command) = &node.command {
323 return Ok(format!("/{command}"));
325 }
326 if node.loop_.is_some() {
327 return Err(DispatchError::InvalidNode {
328 node_id: node.id.clone(),
329 reason: "loop body should route through run_loop, not run_ai".into(),
330 });
331 }
332 Err(DispatchError::InvalidNode {
333 node_id: node.id.clone(),
334 reason: "AI node carried neither prompt: nor command:".into(),
335 })
336}
337
338fn run_loop(node: &Node, ctx: &ExecutorContext<'_>) -> Result<NodeOutcome, DispatchError> {
341 let body = node
342 .loop_
343 .as_ref()
344 .ok_or_else(|| DispatchError::InvalidNode {
345 node_id: node.id.clone(),
346 reason: "loop dispatch on a non-loop node".into(),
347 })?;
348
349 let prompt = loop_body_prompt(node, body)?;
350 let capture_path = node_capture_path(ctx, &node.id);
351 ensure_parent_dir(&capture_path)?;
352
353 let session_uuid = if !body.fresh_context {
356 Some(Uuid::new_v4().to_string())
357 } else {
358 None
359 };
360
361 let until_tokens: Vec<String> = vec![body.until.clone()];
362 let budget = node_timeout(node, ctx);
363 let deadline = Instant::now() + budget;
364
365 for iter in 1..=body.max_iterations {
366 let remaining = deadline.saturating_duration_since(Instant::now());
371 if remaining.is_zero() {
372 return Ok(NodeOutcome::Failed {
373 kind: ErrorKind::Timeout,
374 message: Some(format!(
375 "loop {} exceeded timeout of {}s",
376 node.id,
377 budget.as_secs()
378 )),
379 });
380 }
381
382 if iter == 1 {
387 truncate_capture(&capture_path)?;
388 }
389 let byte_offset = write_iteration_marker(&capture_path, iter)?;
390 ctx.event_log
391 .append(&Event::IterationStarted(IterationStarted {
392 id: new_event_id(),
393 ts: iso_utc_now(),
394 run_id: ctx.run_id.to_string(),
395 node_id: node.id.clone(),
396 iteration: iter,
397 byte_offset,
398 }))
399 .map_err(DispatchError::from)?;
400
401 let session = session_uuid.as_ref().map(|uuid| {
402 if iter == 1 {
403 Session::New(uuid.clone())
404 } else {
405 Session::Resume(uuid.clone())
406 }
407 });
408
409 let opts = build_spawn_opts(node, ctx, prompt.clone(), session);
410
411 let outcome = run_claude_iteration(
412 &opts,
413 &capture_path,
414 CaptureMode::Append,
415 remaining,
416 &until_tokens,
417 )?;
418
419 match outcome {
420 ClaudeOutcome::SentinelHit { token } => {
421 if token == sentinel::BLOCKED {
425 return Ok(NodeOutcome::Failed {
426 kind: ErrorKind::Blocked,
427 message: Some(format!(
428 "loop {} assistant emitted BLOCKED on iteration {iter}",
429 node.id
430 )),
431 });
432 }
433 if token == body.until {
434 return Ok(NodeOutcome::Completed);
435 }
436 return Err(DispatchError::InvalidNode {
439 node_id: node.id.clone(),
440 reason: format!("scanner returned unexpected token `{token}`"),
441 });
442 }
443 ClaudeOutcome::CleanExit => {
444 }
447 failure => return Ok(claude_failure(failure, node, budget)),
448 }
449 }
450
451 Ok(NodeOutcome::Failed {
452 kind: ErrorKind::MaxIterationsExceeded,
453 message: Some(format!(
454 "loop {} exhausted {} iterations without matching `{}`",
455 node.id, body.max_iterations, body.until
456 )),
457 })
458}
459
460fn loop_body_prompt(node: &Node, body: &LoopBody) -> Result<String, DispatchError> {
461 if let Some(prompt) = &body.prompt {
462 return Ok(prompt.clone());
463 }
464 if let Some(command) = &body.command {
465 return Ok(format!("/{command}"));
466 }
467 Err(DispatchError::InvalidNode {
468 node_id: node.id.clone(),
469 reason: "loop body carries neither prompt: nor command:".into(),
470 })
471}
472
473const ITERATION_MARKER_PREFIX: &str = "=== omne:iteration:";
482
483fn write_iteration_marker(capture_path: &Path, iter: u32) -> Result<u64, DispatchError> {
489 let mut f = std::fs::OpenOptions::new()
490 .create(true)
491 .append(true)
492 .open(capture_path)
493 .map_err(|source| DispatchError::Io {
494 path: capture_path.to_path_buf(),
495 source,
496 })?;
497 writeln!(f, "\n{ITERATION_MARKER_PREFIX}{iter} ===").map_err(|source| DispatchError::Io {
498 path: capture_path.to_path_buf(),
499 source,
500 })?;
501 f.sync_data().map_err(|source| DispatchError::Io {
502 path: capture_path.to_path_buf(),
503 source,
504 })?;
505 let meta = std::fs::metadata(capture_path).map_err(|source| DispatchError::Io {
506 path: capture_path.to_path_buf(),
507 source,
508 })?;
509 Ok(meta.len())
510}
511
512fn truncate_capture(capture_path: &Path) -> Result<(), DispatchError> {
517 std::fs::OpenOptions::new()
518 .create(true)
519 .write(true)
520 .truncate(true)
521 .open(capture_path)
522 .map(|_| ())
523 .map_err(|source| DispatchError::Io {
524 path: capture_path.to_path_buf(),
525 source,
526 })
527}
528
529enum ClaudeOutcome {
534 CleanExit,
535 SentinelHit { token: String },
536 Timeout,
537 HostMissing,
538 Crash { stderr_tail: String },
539}
540
541fn run_claude_iteration(
549 opts: &SpawnOpts,
550 capture_path: &Path,
551 capture_mode: CaptureMode,
552 budget: Duration,
553 until_tokens: &[String],
554) -> Result<ClaudeOutcome, DispatchError> {
555 let child = match claude_proc::spawn(opts) {
556 Ok(c) => c,
557 Err(claude_proc::Error::HostMissing) => return Ok(ClaudeOutcome::HostMissing),
558 Err(other) => return Err(DispatchError::from(other)),
559 };
560
561 let proc = ClaudeProcess::from_child_with_mode(child, capture_path, capture_mode)?;
562 let killer = proc.killer();
563 let (cancel_tx, cancel_rx) = mpsc::channel::<()>();
564 let watchdog = spawn_watchdog(killer.clone(), cancel_rx, budget);
565
566 let scanner = Scanner::new(until_tokens);
567 let mut proc = proc;
568 let mut hit: Option<String> = None;
569 let mut stream_error: Option<claude_proc::Error> = None;
570 for line in proc.by_ref() {
571 match line {
572 Ok(al) => {
573 if let Some(h) = scanner.feed(&al.text) {
574 hit = Some(h.token);
575 let _ = killer.kill();
580 break;
581 }
582 }
583 Err(e) => {
584 stream_error = Some(e);
585 break;
586 }
587 }
588 }
589
590 drop(cancel_tx);
594 let timed_out = watchdog.join().unwrap_or(false);
595
596 let (status, stderr) = match proc.finish() {
597 Ok(s) => s,
598 Err(e) => {
599 return Err(DispatchError::from(e));
600 }
601 };
602
603 if let Some(token) = hit {
604 return Ok(ClaudeOutcome::SentinelHit { token });
605 }
606 if timed_out {
607 return Ok(ClaudeOutcome::Timeout);
608 }
609 if let Some(e) = stream_error {
610 return Ok(ClaudeOutcome::Crash {
614 stderr_tail: format!("stream error: {e}\nstderr: {}", tail(&stderr, 1024)),
615 });
616 }
617 if !status.success() {
618 return Ok(ClaudeOutcome::Crash {
619 stderr_tail: tail(&stderr, 1024),
620 });
621 }
622 Ok(ClaudeOutcome::CleanExit)
623}
624
625fn spawn_watchdog(
636 killer: ChildKiller,
637 rx: mpsc::Receiver<()>,
638 budget: Duration,
639) -> thread::JoinHandle<bool> {
640 thread::spawn(move || match rx.recv_timeout(budget) {
641 Err(mpsc::RecvTimeoutError::Disconnected) => false,
642 Err(mpsc::RecvTimeoutError::Timeout) => {
643 let _ = killer.kill();
644 true
645 }
646 Ok(()) => {
647 false
652 }
653 })
654}
655
656fn build_spawn_opts(
657 node: &Node,
658 ctx: &ExecutorContext<'_>,
659 prompt: String,
660 session: Option<Session>,
661) -> SpawnOpts {
662 SpawnOpts {
663 prompt,
664 cwd: ctx.worktree.to_path_buf(),
665 model: effective_model(node, ctx).map(str::to_string),
666 allowed_tools: node.allowed_tools.clone(),
667 session,
668 extra_args: Vec::new(),
669 env_vars: build_ai_env_vars(node, ctx),
670 bin: ctx.claude_bin.map(|p| p.to_path_buf()),
671 }
672}
673
674fn build_ai_env_vars(node: &Node, ctx: &ExecutorContext<'_>) -> Vec<(String, String)> {
681 let mut env = Vec::with_capacity(3 + ctx.inputs.len());
682 env.push(("OMNE_RUN_ID".to_string(), ctx.run_id.to_string()));
683 env.push(("OMNE_NODE_ID".to_string(), node.id.clone()));
684 env.push((
685 "OMNE_VOLUME_ROOT".to_string(),
686 ctx.volume_root.to_string_lossy().into_owned(),
687 ));
688 for input in ctx.inputs {
689 env.push((
690 format!("OMNE_INPUT_{}", input.key.to_uppercase()),
691 input.value.clone(),
692 ));
693 }
694 env
695}
696
697fn effective_model<'a>(node: &'a Node, ctx: &'a ExecutorContext<'a>) -> Option<&'a str> {
698 node.model
699 .as_deref()
700 .or_else(|| ctx.default_model.map(|s| s as &str))
701}
702
703struct RawExit {
708 status: std::process::ExitStatus,
709 stdout: Vec<u8>,
710 stderr: Vec<u8>,
711 timed_out: bool,
712}
713
714#[cfg(unix)]
718fn set_new_process_group(cmd: &mut Command) {
719 use std::os::unix::process::CommandExt;
720 cmd.process_group(0);
724}
725
726#[cfg(windows)]
727fn set_new_process_group(cmd: &mut Command) {
728 use std::os::windows::process::CommandExt;
729 const CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
733 cmd.creation_flags(CREATE_NEW_PROCESS_GROUP);
734}
735
736#[cfg(unix)]
747fn kill_process_tree(pid: u32) {
748 let _ = Command::new("kill")
749 .args(["-KILL", "--", &format!("-{pid}")])
750 .stdin(Stdio::null())
751 .stdout(Stdio::null())
752 .stderr(Stdio::null())
753 .status();
754}
755
756#[cfg(windows)]
757fn kill_process_tree(pid: u32) {
758 let _ = Command::new("taskkill")
759 .args(["/T", "/F", "/PID", &pid.to_string()])
760 .stdin(Stdio::null())
761 .stdout(Stdio::null())
762 .stderr(Stdio::null())
763 .status();
764}
765
766fn run_command_with_timeout(
767 cmd: &mut Command,
768 budget: Duration,
769 capture_stdout_at: Option<&Path>,
770) -> Result<RawExit, DispatchError> {
771 set_new_process_group(cmd);
780 let mut child = cmd
781 .spawn()
782 .map_err(|source| DispatchError::Spawn { source })?;
783 let child_pid = child.id();
784
785 let stdout_handle = child.stdout.take();
789 let stderr_handle = child.stderr.take();
790 let stdout_thread = stdout_handle.map(|mut s| {
791 thread::spawn(move || -> std::io::Result<Vec<u8>> {
792 let mut buf = Vec::new();
793 s.read_to_end(&mut buf)?;
794 Ok(buf)
795 })
796 });
797 let stderr_thread = stderr_handle.map(|mut s| {
798 thread::spawn(move || -> std::io::Result<Vec<u8>> {
799 let mut buf = Vec::new();
800 s.read_to_end(&mut buf)?;
801 Ok(buf)
802 })
803 });
804
805 use wait_timeout::ChildExt;
806 let (status, timed_out) = match child
807 .wait_timeout(budget)
808 .map_err(|source| DispatchError::Wait { source })?
809 {
810 Some(s) => (s, false),
811 None => {
812 kill_process_tree(child_pid);
818 let _ = child.kill();
819 let s = child
820 .wait()
821 .map_err(|source| DispatchError::Wait { source })?;
822 (s, true)
823 }
824 };
825
826 let stdout = stdout_thread
827 .map(|h| {
828 h.join()
829 .unwrap_or_else(|_| Ok(Vec::new()))
830 .unwrap_or_default()
831 })
832 .unwrap_or_default();
833 let stderr = stderr_thread
834 .map(|h| {
835 h.join()
836 .unwrap_or_else(|_| Ok(Vec::new()))
837 .unwrap_or_default()
838 })
839 .unwrap_or_default();
840
841 if let Some(cap) = capture_stdout_at {
842 std::fs::write(cap, &stdout).map_err(|source| DispatchError::Io {
843 path: cap.to_path_buf(),
844 source,
845 })?;
846 }
847 Ok(RawExit {
848 status,
849 stdout,
850 stderr,
851 timed_out,
852 })
853}
854
855fn outcome_from_exit(raw: RawExit) -> NodeOutcome {
856 if raw.timed_out {
857 return NodeOutcome::Failed {
858 kind: ErrorKind::Timeout,
859 message: Some(format!(
860 "subprocess killed after exceeding wall-clock budget; stderr: {}",
861 String::from_utf8_lossy(&tail_bytes(&raw.stderr, 1024)).into_owned()
862 )),
863 };
864 }
865 if raw.status.success() {
866 return NodeOutcome::Completed;
867 }
868 NodeOutcome::Failed {
869 kind: ErrorKind::Crash,
870 message: Some(
871 String::from_utf8_lossy(&tail_bytes(&raw.stderr, 1024))
872 .trim()
873 .to_string(),
874 ),
875 }
876}
877
878fn run_gate(
881 node: &Node,
882 gate: &str,
883 ctx: &ExecutorContext<'_>,
884) -> Result<NodeOutcome, DispatchError> {
885 let script = gate_script_path(ctx.volume_root, gate);
886 if !script.is_file() {
891 return Ok(NodeOutcome::Failed {
892 kind: ErrorKind::GateFailed,
893 message: Some(format!(
894 "gate hook {} missing at dispatch time",
895 script.display()
896 )),
897 });
898 }
899 if let Some(outcome) = enforce_gate_boundary(&script, ctx.volume_root, gate)? {
905 return Ok(outcome);
906 }
907 let mut cmd = gate_command(&script);
912 cmd.current_dir(ctx.volume_root)
913 .stdin(Stdio::null())
914 .stdout(Stdio::piped())
915 .stderr(Stdio::piped())
916 .env("OMNE_RUN_ID", ctx.run_id)
917 .env("OMNE_NODE_ID", &node.id)
918 .env("OMNE_GATE_NAME", gate)
919 .env("OMNE_VOLUME_ROOT", ctx.volume_root);
920
921 let raw = run_command_with_timeout(&mut cmd, ctx.gate_timeout, None)?;
922 if raw.timed_out {
923 return Ok(NodeOutcome::Failed {
924 kind: ErrorKind::GateTimeout,
925 message: Some(format!(
926 "gate {gate} exceeded {}s budget",
927 ctx.gate_timeout.as_secs()
928 )),
929 });
930 }
931 if !raw.status.success() {
932 return Ok(NodeOutcome::Failed {
933 kind: ErrorKind::GateFailed,
934 message: Some(
935 String::from_utf8_lossy(&tail_bytes(&raw.stderr, 1024))
936 .trim()
937 .to_string(),
938 ),
939 });
940 }
941
942 let stdout_tail = if raw.stdout.is_empty() {
943 None
944 } else {
945 let trimmed = String::from_utf8_lossy(&tail_bytes(&raw.stdout, 1024))
946 .trim()
947 .to_string();
948 (!trimmed.is_empty()).then_some(trimmed)
949 };
950
951 ctx.event_log
952 .append(&Event::GatePassed(GatePassed {
953 id: new_event_id(),
954 ts: iso_utc_now(),
955 run_id: ctx.run_id.to_string(),
956 node_id: node.id.clone(),
957 gate: gate.to_string(),
958 method: GateMethod::Hook,
959 stdout: stdout_tail,
960 }))
961 .map_err(DispatchError::from)?;
962 Ok(NodeOutcome::Completed)
963}
964
965fn enforce_gate_boundary(
976 script: &Path,
977 volume_root: &Path,
978 gate: &str,
979) -> Result<Option<NodeOutcome>, DispatchError> {
980 let hooks_dir = volume::dist_dir(volume_root).join("hooks");
981 let canonical_hooks = match hooks_dir.canonicalize() {
982 Ok(p) => p,
983 Err(_) => return Ok(None),
984 };
985 let canonical_script = script.canonicalize().map_err(|source| DispatchError::Io {
986 path: script.to_path_buf(),
987 source,
988 })?;
989 if !canonical_script.starts_with(&canonical_hooks) {
990 return Ok(Some(NodeOutcome::Failed {
991 kind: ErrorKind::GateFailed,
992 message: Some(format!(
993 "gate `{gate}` resolves outside {}: {}",
994 canonical_hooks.display(),
995 canonical_script.display()
996 )),
997 }));
998 }
999 Ok(None)
1000}
1001
1002fn gate_script_path(volume_root: &Path, gate: &str) -> PathBuf {
1003 volume::dist_dir(volume_root)
1004 .join("hooks")
1005 .join(format!("{gate}.{}", platform_hook_extension()))
1006}
1007
1008#[cfg(windows)]
1009fn gate_command(script: &Path) -> Command {
1010 let mut cmd = Command::new("powershell");
1011 cmd.arg("-NoProfile").arg("-File").arg(script);
1017 cmd
1018}
1019
1020#[cfg(not(windows))]
1021fn gate_command(script: &Path) -> Command {
1022 let mut cmd = Command::new("sh");
1023 cmd.arg(script);
1024 cmd
1025}
1026
1027#[cfg(windows)]
1028fn platform_hook_extension() -> &'static str {
1029 "ps1"
1030}
1031
1032#[cfg(not(windows))]
1033fn platform_hook_extension() -> &'static str {
1034 "sh"
1035}
1036
1037fn emit_terminal(
1040 node: &Node,
1041 outcome: &NodeOutcome,
1042 ctx: &ExecutorContext<'_>,
1043) -> Result<(), DispatchError> {
1044 let event = match outcome {
1045 NodeOutcome::Completed => Event::NodeCompleted(NodeCompleted {
1046 id: new_event_id(),
1047 ts: iso_utc_now(),
1048 run_id: ctx.run_id.to_string(),
1049 node_id: node.id.clone(),
1050 output_path: capture_output_path_wire(ctx.run_id, &node.id),
1051 }),
1052 NodeOutcome::Failed { kind, message } => Event::NodeFailed(NodeFailed {
1053 id: new_event_id(),
1054 ts: iso_utc_now(),
1055 run_id: ctx.run_id.to_string(),
1056 node_id: node.id.clone(),
1057 error: NodeError { kind: *kind },
1058 message: message.clone(),
1059 }),
1060 };
1061 ctx.event_log.append(&event).map_err(DispatchError::from)?;
1062 Ok(())
1063}
1064
1065fn node_capture_path(ctx: &ExecutorContext<'_>, node_id: &str) -> PathBuf {
1068 volume::nodes_dir(ctx.volume_root, ctx.run_id).join(format!("{node_id}.out"))
1069}
1070
1071fn capture_output_path_wire(run_id: &str, node_id: &str) -> String {
1076 volume::node_capture_wire_path(run_id, node_id)
1077}
1078
1079fn ensure_parent_dir(path: &Path) -> Result<(), DispatchError> {
1080 if let Some(parent) = path.parent() {
1081 if !parent.as_os_str().is_empty() {
1082 std::fs::create_dir_all(parent).map_err(|source| DispatchError::Io {
1083 path: parent.to_path_buf(),
1084 source,
1085 })?;
1086 }
1087 }
1088 Ok(())
1089}
1090
1091fn node_timeout(node: &Node, ctx: &ExecutorContext<'_>) -> Duration {
1092 node.timeout
1093 .map(Duration::from_secs)
1094 .unwrap_or(ctx.default_node_timeout)
1095}
1096
1097fn new_event_id() -> String {
1098 Ulid::new().to_string().to_lowercase()
1099}
1100
1101fn iso_utc_now() -> String {
1105 crate::clock::now_utc().format_iso_utc()
1106}
1107
1108fn tail_bytes(b: &[u8], max_bytes: usize) -> Vec<u8> {
1114 if b.len() <= max_bytes {
1115 return b.to_vec();
1116 }
1117 let mut start = b.len() - max_bytes;
1118 while start < b.len() && (b[start] & 0b1100_0000) == 0b1000_0000 {
1122 start += 1;
1123 }
1124 b[start..].to_vec()
1125}
1126
1127fn tail(s: &str, max_bytes: usize) -> String {
1130 let bytes = tail_bytes(s.as_bytes(), max_bytes);
1131 String::from_utf8_lossy(&bytes).trim().to_string()
1132}
1133
1134#[derive(Debug, thiserror::Error)]
1142pub enum DispatchError {
1143 #[error("node `{node_id}` rejected by executor: {reason}")]
1144 InvalidNode { node_id: String, reason: String },
1145
1146 #[error("event log error: {0}")]
1147 EventLog(#[from] crate::event_log::Error),
1148
1149 #[error("claude subprocess error: {0}")]
1150 ClaudeProc(#[from] claude_proc::Error),
1151
1152 #[error("I/O error on {}: {source}", path.display())]
1157 Io {
1158 path: PathBuf,
1159 #[source]
1160 source: std::io::Error,
1161 },
1162
1163 #[error("failed to spawn subprocess: {source}")]
1168 Spawn {
1169 #[source]
1170 source: std::io::Error,
1171 },
1172
1173 #[error("failed to wait on subprocess: {source}")]
1178 Wait {
1179 #[source]
1180 source: std::io::Error,
1181 },
1182}
1183
1184#[cfg(test)]
1185mod tests {
1186 use super::*;
1187 use crate::event_log::EventLog;
1188 use crate::events::Input;
1189 use crate::pipe::{Node, TriggerRule};
1190 use std::path::PathBuf;
1191 use std::time::Duration;
1192 use tempfile::TempDir;
1193
1194 #[test]
1195 fn iso_utc_now_is_plausible_shape() {
1196 let s = iso_utc_now();
1197 assert_eq!(s.len(), 20);
1198 assert!(s.ends_with('Z'));
1199 assert!(s.contains('T'));
1200 }
1201
1202 #[test]
1203 fn tail_trims_and_bounds() {
1204 assert_eq!(tail(" hi ", 1024), "hi");
1205 let long: String = "x".repeat(5000);
1206 assert_eq!(tail(&long, 10).len(), 10);
1207 }
1208
1209 #[test]
1210 fn output_path_wire_uses_forward_slashes() {
1211 let wire = capture_output_path_wire("feature-01abc", "research");
1212 assert!(!wire.contains('\\'));
1213 assert_eq!(wire, ".omne/var/runs/feature-01abc/nodes/research.out");
1214 }
1215
1216 fn bare_ai_node(id: &str) -> Node {
1217 Node {
1218 id: id.into(),
1219 depends_on: vec![],
1220 model: None,
1221 allowed_tools: vec![],
1222 gate: None,
1223 timeout: None,
1224 trigger_rule: TriggerRule::AllSuccess,
1225 command: Some("plan".into()),
1226 prompt: None,
1227 bash: None,
1228 loop_: None,
1229 }
1230 }
1231
1232 #[test]
1233 fn build_ai_env_vars_contains_base_and_input_vars() {
1234 let tmp = TempDir::new().unwrap();
1235 let log = EventLog::for_run(tmp.path(), "feature-01abc").unwrap();
1236 let volume_root: PathBuf = tmp.path().to_path_buf();
1237 let worktree: PathBuf = tmp.path().join(".omne/wt/feature-01abc");
1238 let inputs = vec![
1239 Input {
1240 key: "feature_name".into(),
1241 value: "add-hello".into(),
1242 },
1243 Input {
1244 key: "scope".into(),
1245 value: "src/api".into(),
1246 },
1247 ];
1248 let ctx = ExecutorContext {
1249 volume_root: &volume_root,
1250 run_id: "feature-01abc",
1251 worktree: &worktree,
1252 event_log: &log,
1253 inputs: &inputs,
1254 default_model: None,
1255 claude_bin: None,
1256 default_node_timeout: Duration::from_secs(60),
1257 gate_timeout: Duration::from_secs(60),
1258 };
1259 let node = bare_ai_node("plan");
1260
1261 let env = build_ai_env_vars(&node, &ctx);
1262
1263 let lookup = |k: &str| env.iter().find(|(key, _)| key == k).map(|(_, v)| v.clone());
1264 assert_eq!(lookup("OMNE_RUN_ID"), Some("feature-01abc".to_string()));
1265 assert_eq!(lookup("OMNE_NODE_ID"), Some("plan".to_string()));
1266 assert_eq!(
1267 lookup("OMNE_VOLUME_ROOT"),
1268 Some(volume_root.to_string_lossy().into_owned())
1269 );
1270 assert_eq!(
1271 lookup("OMNE_INPUT_FEATURE_NAME"),
1272 Some("add-hello".to_string())
1273 );
1274 assert_eq!(lookup("OMNE_INPUT_SCOPE"), Some("src/api".to_string()));
1275 }
1276
1277 #[test]
1278 fn build_ai_env_vars_no_inputs_still_has_base_vars() {
1279 let tmp = TempDir::new().unwrap();
1280 let log = EventLog::for_run(tmp.path(), "feature-01xyz").unwrap();
1281 let volume_root: PathBuf = tmp.path().to_path_buf();
1282 let worktree: PathBuf = tmp.path().join(".omne/wt/feature-01xyz");
1283 let ctx = ExecutorContext {
1284 volume_root: &volume_root,
1285 run_id: "feature-01xyz",
1286 worktree: &worktree,
1287 event_log: &log,
1288 inputs: &[],
1289 default_model: None,
1290 claude_bin: None,
1291 default_node_timeout: Duration::from_secs(60),
1292 gate_timeout: Duration::from_secs(60),
1293 };
1294 let node = bare_ai_node("research");
1295
1296 let env = build_ai_env_vars(&node, &ctx);
1297
1298 assert_eq!(env.len(), 3);
1299 assert!(env.iter().all(|(k, _)| !k.starts_with("OMNE_INPUT_")));
1300 }
1301}