1#![allow(dead_code)]
25
26use std::io::{Read, Write};
27use std::path::{Path, PathBuf};
28use std::process::{Command, Stdio};
29use std::sync::mpsc;
30use std::thread;
31use std::time::{Duration, Instant};
32
33use ulid::Ulid;
34use uuid::Uuid;
35
36use crate::claude_proc::{self, CaptureMode, ChildKiller, ClaudeProcess, Session, SpawnOpts};
37use crate::event_log::EventLog;
38use crate::events::{
39 ErrorKind, Event, GateMethod, GatePassed, Input, IterationStarted, NodeCompleted, NodeError,
40 NodeFailed, NodeKind, NodeStarted,
41};
42use crate::pipe::{ExecutionKind, LoopBody, Node};
43use crate::sentinel::{self, Scanner};
44use crate::volume;
45
46pub const DEFAULT_NODE_TIMEOUT: Duration = Duration::from_secs(1800);
50
51pub const GATE_TIMEOUT: Duration = Duration::from_secs(60);
53
54pub struct ExecutorContext<'a> {
59 pub volume_root: &'a Path,
61 pub run_id: &'a str,
63 pub worktree: &'a Path,
66 pub event_log: &'a EventLog,
69 pub inputs: &'a [Input],
74 pub default_model: Option<&'a str>,
77 pub claude_bin: Option<&'a Path>,
79 pub default_node_timeout: Duration,
82 pub gate_timeout: Duration,
87}
88
89impl<'a> ExecutorContext<'a> {
90 pub fn new(
94 volume_root: &'a Path,
95 run_id: &'a str,
96 worktree: &'a Path,
97 event_log: &'a EventLog,
98 inputs: &'a [Input],
99 ) -> Self {
100 Self {
101 volume_root,
102 run_id,
103 worktree,
104 event_log,
105 inputs,
106 default_model: None,
107 claude_bin: None,
108 default_node_timeout: DEFAULT_NODE_TIMEOUT,
109 gate_timeout: GATE_TIMEOUT,
110 }
111 }
112}
113
114#[derive(Debug, Clone, Eq, PartialEq)]
120pub enum NodeOutcome {
121 Completed,
122 Failed {
123 kind: ErrorKind,
124 message: Option<String>,
125 },
126}
127
128pub fn dispatch(node: &Node, ctx: &ExecutorContext<'_>) -> Result<NodeOutcome, DispatchError> {
136 let kind = node
137 .execution_kind()
138 .ok_or_else(|| DispatchError::InvalidNode {
139 node_id: node.id.clone(),
140 reason: "node has no execution kind (validator should have caught this)".into(),
141 })?;
142
143 let event_kind = match kind {
144 ExecutionKind::Command => NodeKind::Command,
145 ExecutionKind::Prompt => NodeKind::Prompt,
146 ExecutionKind::Bash => NodeKind::Bash,
147 ExecutionKind::Loop => NodeKind::Loop,
148 };
149
150 ctx.event_log
151 .append(&Event::NodeStarted(NodeStarted {
152 id: new_event_id(),
153 ts: iso_utc_now(),
154 run_id: ctx.run_id.to_string(),
155 node_id: node.id.clone(),
156 kind: event_kind,
157 name: None,
158 model: effective_model(node, ctx).map(str::to_string),
159 }))
160 .map_err(DispatchError::from)?;
161
162 let final_outcome = body_with_gate(node, kind, ctx);
168 emit_terminal(node, &final_outcome, ctx)?;
169 Ok(final_outcome)
170}
171
172fn body_with_gate(node: &Node, kind: ExecutionKind, ctx: &ExecutorContext<'_>) -> NodeOutcome {
179 let body_outcome = match kind {
180 ExecutionKind::Bash => run_bash(node, ctx),
181 ExecutionKind::Command | ExecutionKind::Prompt => run_ai(node, ctx),
182 ExecutionKind::Loop => run_loop(node, ctx),
183 };
184 let body_outcome = match body_outcome {
185 Ok(o) => o,
186 Err(e) => {
187 return NodeOutcome::Failed {
188 kind: ErrorKind::Crash,
189 message: Some(format!("dispatch error in node body: {e}")),
190 }
191 }
192 };
193 match body_outcome {
194 NodeOutcome::Completed => match node.gate.as_deref() {
195 Some(gate) => match run_gate(node, gate, ctx) {
196 Ok(o) => o,
197 Err(e) => NodeOutcome::Failed {
198 kind: ErrorKind::GateFailed,
199 message: Some(format!("dispatch error in gate: {e}")),
200 },
201 },
202 None => NodeOutcome::Completed,
203 },
204 other => other,
205 }
206}
207
208fn run_bash(node: &Node, ctx: &ExecutorContext<'_>) -> Result<NodeOutcome, DispatchError> {
211 let bash_body = node
212 .bash
213 .as_deref()
214 .ok_or_else(|| DispatchError::InvalidNode {
215 node_id: node.id.clone(),
216 reason: "bash node missing bash: body".into(),
217 })?;
218
219 let capture_path = node_capture_path(ctx, &node.id);
220 ensure_parent_dir(&capture_path)?;
221
222 let mut cmd = bash_command(bash_body);
223 cmd.current_dir(ctx.worktree)
224 .stdin(Stdio::null())
225 .stdout(Stdio::piped())
226 .stderr(Stdio::piped());
227 for input in ctx.inputs {
228 let env_key = format!("OMNE_INPUT_{}", input.key.to_uppercase());
229 cmd.env(env_key, &input.value);
230 }
231
232 let budget = node_timeout(node, ctx);
233 let outcome = run_command_with_timeout(&mut cmd, budget, Some(&capture_path))?;
234 Ok(outcome_from_exit(outcome))
235}
236
237#[cfg(windows)]
238fn bash_command(body: &str) -> Command {
239 use std::os::windows::process::CommandExt;
240 let mut cmd = Command::new("cmd");
241 cmd.raw_arg(format!("/S /C \"{body}\""));
254 cmd
255}
256
257#[cfg(not(windows))]
258fn bash_command(body: &str) -> Command {
259 let mut cmd = Command::new("sh");
260 cmd.arg("-c").arg(body);
261 cmd
262}
263
264fn run_ai(node: &Node, ctx: &ExecutorContext<'_>) -> Result<NodeOutcome, DispatchError> {
267 let prompt = ai_prompt_for(node)?;
268 let capture_path = node_capture_path(ctx, &node.id);
269 ensure_parent_dir(&capture_path)?;
270
271 let opts = build_spawn_opts(node, ctx, prompt, None);
272 let budget = node_timeout(node, ctx);
273 let outcome = run_claude_iteration(&opts, &capture_path, CaptureMode::Truncate, budget, &[])?;
274
275 Ok(match outcome {
276 ClaudeOutcome::CleanExit => NodeOutcome::Completed,
277 ClaudeOutcome::SentinelHit { token: _ } => {
278 NodeOutcome::Failed {
280 kind: ErrorKind::Blocked,
281 message: Some("assistant emitted BLOCKED".into()),
282 }
283 }
284 other => claude_failure(other, node, budget),
285 })
286}
287
288fn claude_failure(outcome: ClaudeOutcome, node: &Node, budget: Duration) -> NodeOutcome {
295 match outcome {
296 ClaudeOutcome::CleanExit | ClaudeOutcome::SentinelHit { .. } => {
297 unreachable!("claude_failure invoked on a non-failure outcome");
298 }
299 ClaudeOutcome::Timeout => NodeOutcome::Failed {
300 kind: ErrorKind::Timeout,
301 message: Some(format!(
302 "node {} exceeded timeout of {}s",
303 node.id,
304 budget.as_secs()
305 )),
306 },
307 ClaudeOutcome::HostMissing => NodeOutcome::Failed {
308 kind: ErrorKind::HostMissing,
309 message: Some("claude binary not found on PATH".into()),
310 },
311 ClaudeOutcome::Crash { stderr_tail } => NodeOutcome::Failed {
312 kind: ErrorKind::Crash,
313 message: Some(stderr_tail),
314 },
315 }
316}
317
318fn ai_prompt_for(node: &Node) -> Result<String, DispatchError> {
319 if let Some(prompt) = &node.prompt {
320 return Ok(prompt.clone());
321 }
322 if let Some(command) = &node.command {
323 return Ok(format!("/{command}"));
325 }
326 if node.loop_.is_some() {
327 return Err(DispatchError::InvalidNode {
328 node_id: node.id.clone(),
329 reason: "loop body should route through run_loop, not run_ai".into(),
330 });
331 }
332 Err(DispatchError::InvalidNode {
333 node_id: node.id.clone(),
334 reason: "AI node carried neither prompt: nor command:".into(),
335 })
336}
337
338fn run_loop(node: &Node, ctx: &ExecutorContext<'_>) -> Result<NodeOutcome, DispatchError> {
341 let body = node
342 .loop_
343 .as_ref()
344 .ok_or_else(|| DispatchError::InvalidNode {
345 node_id: node.id.clone(),
346 reason: "loop dispatch on a non-loop node".into(),
347 })?;
348
349 let prompt = loop_body_prompt(node, body)?;
350 let capture_path = node_capture_path(ctx, &node.id);
351 ensure_parent_dir(&capture_path)?;
352
353 let session_uuid = if !body.fresh_context {
356 Some(Uuid::new_v4().to_string())
357 } else {
358 None
359 };
360
361 let until_tokens: Vec<String> = vec![body.until.clone()];
362 let budget = node_timeout(node, ctx);
363 let deadline = Instant::now() + budget;
364
365 for iter in 1..=body.max_iterations {
366 let remaining = deadline.saturating_duration_since(Instant::now());
371 if remaining.is_zero() {
372 return Ok(NodeOutcome::Failed {
373 kind: ErrorKind::Timeout,
374 message: Some(format!(
375 "loop {} exceeded timeout of {}s",
376 node.id,
377 budget.as_secs()
378 )),
379 });
380 }
381
382 if iter == 1 {
387 truncate_capture(&capture_path)?;
388 }
389 let byte_offset = write_iteration_marker(&capture_path, iter)?;
390 ctx.event_log
391 .append(&Event::IterationStarted(IterationStarted {
392 id: new_event_id(),
393 ts: iso_utc_now(),
394 run_id: ctx.run_id.to_string(),
395 node_id: node.id.clone(),
396 iteration: iter,
397 byte_offset,
398 }))
399 .map_err(DispatchError::from)?;
400
401 let session = session_uuid.as_ref().map(|uuid| {
402 if iter == 1 {
403 Session::New(uuid.clone())
404 } else {
405 Session::Resume(uuid.clone())
406 }
407 });
408
409 let opts = build_spawn_opts(node, ctx, prompt.clone(), session);
410
411 let outcome = run_claude_iteration(
412 &opts,
413 &capture_path,
414 CaptureMode::Append,
415 remaining,
416 &until_tokens,
417 )?;
418
419 match outcome {
420 ClaudeOutcome::SentinelHit { token } => {
421 if token == sentinel::BLOCKED {
425 return Ok(NodeOutcome::Failed {
426 kind: ErrorKind::Blocked,
427 message: Some(format!(
428 "loop {} assistant emitted BLOCKED on iteration {iter}",
429 node.id
430 )),
431 });
432 }
433 if token == body.until {
434 return Ok(NodeOutcome::Completed);
435 }
436 return Err(DispatchError::InvalidNode {
439 node_id: node.id.clone(),
440 reason: format!("scanner returned unexpected token `{token}`"),
441 });
442 }
443 ClaudeOutcome::CleanExit => {
444 }
447 failure => return Ok(claude_failure(failure, node, budget)),
448 }
449 }
450
451 Ok(NodeOutcome::Failed {
452 kind: ErrorKind::MaxIterationsExceeded,
453 message: Some(format!(
454 "loop {} exhausted {} iterations without matching `{}`",
455 node.id, body.max_iterations, body.until
456 )),
457 })
458}
459
460fn loop_body_prompt(node: &Node, body: &LoopBody) -> Result<String, DispatchError> {
461 if let Some(prompt) = &body.prompt {
462 return Ok(prompt.clone());
463 }
464 if let Some(command) = &body.command {
465 return Ok(format!("/{command}"));
466 }
467 Err(DispatchError::InvalidNode {
468 node_id: node.id.clone(),
469 reason: "loop body carries neither prompt: nor command:".into(),
470 })
471}
472
473const ITERATION_MARKER_PREFIX: &str = "=== omne:iteration:";
482
483fn write_iteration_marker(capture_path: &Path, iter: u32) -> Result<u64, DispatchError> {
489 let mut f = std::fs::OpenOptions::new()
490 .create(true)
491 .append(true)
492 .open(capture_path)
493 .map_err(|source| DispatchError::Io {
494 path: capture_path.to_path_buf(),
495 source,
496 })?;
497 writeln!(f, "\n{ITERATION_MARKER_PREFIX}{iter} ===").map_err(|source| DispatchError::Io {
498 path: capture_path.to_path_buf(),
499 source,
500 })?;
501 f.sync_data().map_err(|source| DispatchError::Io {
502 path: capture_path.to_path_buf(),
503 source,
504 })?;
505 let meta = std::fs::metadata(capture_path).map_err(|source| DispatchError::Io {
506 path: capture_path.to_path_buf(),
507 source,
508 })?;
509 Ok(meta.len())
510}
511
512fn truncate_capture(capture_path: &Path) -> Result<(), DispatchError> {
517 std::fs::OpenOptions::new()
518 .create(true)
519 .write(true)
520 .truncate(true)
521 .open(capture_path)
522 .map(|_| ())
523 .map_err(|source| DispatchError::Io {
524 path: capture_path.to_path_buf(),
525 source,
526 })
527}
528
529enum ClaudeOutcome {
534 CleanExit,
535 SentinelHit { token: String },
536 Timeout,
537 HostMissing,
538 Crash { stderr_tail: String },
539}
540
541fn run_claude_iteration(
549 opts: &SpawnOpts,
550 capture_path: &Path,
551 capture_mode: CaptureMode,
552 budget: Duration,
553 until_tokens: &[String],
554) -> Result<ClaudeOutcome, DispatchError> {
555 let child = match claude_proc::spawn(opts) {
556 Ok(c) => c,
557 Err(claude_proc::Error::HostMissing) => return Ok(ClaudeOutcome::HostMissing),
558 Err(other) => return Err(DispatchError::from(other)),
559 };
560
561 let proc = ClaudeProcess::from_child_with_mode(child, capture_path, capture_mode)?;
562 let killer = proc.killer();
563 let (cancel_tx, cancel_rx) = mpsc::channel::<()>();
564 let watchdog = spawn_watchdog(killer.clone(), cancel_rx, budget);
565
566 let scanner = Scanner::new(until_tokens);
567 let mut proc = proc;
568 let mut hit: Option<String> = None;
569 let mut stream_error: Option<claude_proc::Error> = None;
570 for line in proc.by_ref() {
571 match line {
572 Ok(al) => {
573 if let Some(h) = scanner.feed(&al.text) {
574 hit = Some(h.token);
575 let _ = killer.kill();
580 break;
581 }
582 }
583 Err(e) => {
584 stream_error = Some(e);
585 break;
586 }
587 }
588 }
589
590 drop(cancel_tx);
594 let timed_out = watchdog.join().unwrap_or(false);
595
596 let (status, stderr) = match proc.finish() {
597 Ok(s) => s,
598 Err(e) => {
599 return Err(DispatchError::from(e));
600 }
601 };
602
603 if let Some(token) = hit {
604 return Ok(ClaudeOutcome::SentinelHit { token });
605 }
606 if timed_out {
607 return Ok(ClaudeOutcome::Timeout);
608 }
609 if let Some(e) = stream_error {
610 return Ok(ClaudeOutcome::Crash {
614 stderr_tail: format!("stream error: {e}\nstderr: {}", tail(&stderr, 1024)),
615 });
616 }
617 if !status.success() {
618 return Ok(ClaudeOutcome::Crash {
619 stderr_tail: tail(&stderr, 1024),
620 });
621 }
622 Ok(ClaudeOutcome::CleanExit)
623}
624
625fn spawn_watchdog(
636 killer: ChildKiller,
637 rx: mpsc::Receiver<()>,
638 budget: Duration,
639) -> thread::JoinHandle<bool> {
640 thread::spawn(move || match rx.recv_timeout(budget) {
641 Err(mpsc::RecvTimeoutError::Disconnected) => false,
642 Err(mpsc::RecvTimeoutError::Timeout) => {
643 let _ = killer.kill();
644 true
645 }
646 Ok(()) => {
647 false
652 }
653 })
654}
655
656fn build_spawn_opts(
657 node: &Node,
658 ctx: &ExecutorContext<'_>,
659 prompt: String,
660 session: Option<Session>,
661) -> SpawnOpts {
662 SpawnOpts {
663 prompt,
664 cwd: ctx.worktree.to_path_buf(),
665 model: effective_model(node, ctx).map(str::to_string),
666 allowed_tools: node.allowed_tools.clone(),
667 session,
668 extra_args: Vec::new(),
669 bin: ctx.claude_bin.map(|p| p.to_path_buf()),
670 }
671}
672
673fn effective_model<'a>(node: &'a Node, ctx: &'a ExecutorContext<'a>) -> Option<&'a str> {
674 node.model
675 .as_deref()
676 .or_else(|| ctx.default_model.map(|s| s as &str))
677}
678
679struct RawExit {
684 status: std::process::ExitStatus,
685 stdout: Vec<u8>,
686 stderr: Vec<u8>,
687 timed_out: bool,
688}
689
690#[cfg(unix)]
694fn set_new_process_group(cmd: &mut Command) {
695 use std::os::unix::process::CommandExt;
696 cmd.process_group(0);
700}
701
702#[cfg(windows)]
703fn set_new_process_group(cmd: &mut Command) {
704 use std::os::windows::process::CommandExt;
705 const CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
709 cmd.creation_flags(CREATE_NEW_PROCESS_GROUP);
710}
711
712#[cfg(unix)]
723fn kill_process_tree(pid: u32) {
724 let _ = Command::new("kill")
725 .args(["-KILL", "--", &format!("-{pid}")])
726 .stdin(Stdio::null())
727 .stdout(Stdio::null())
728 .stderr(Stdio::null())
729 .status();
730}
731
732#[cfg(windows)]
733fn kill_process_tree(pid: u32) {
734 let _ = Command::new("taskkill")
735 .args(["/T", "/F", "/PID", &pid.to_string()])
736 .stdin(Stdio::null())
737 .stdout(Stdio::null())
738 .stderr(Stdio::null())
739 .status();
740}
741
742fn run_command_with_timeout(
743 cmd: &mut Command,
744 budget: Duration,
745 capture_stdout_at: Option<&Path>,
746) -> Result<RawExit, DispatchError> {
747 set_new_process_group(cmd);
756 let mut child = cmd
757 .spawn()
758 .map_err(|source| DispatchError::Spawn { source })?;
759 let child_pid = child.id();
760
761 let stdout_handle = child.stdout.take();
765 let stderr_handle = child.stderr.take();
766 let stdout_thread = stdout_handle.map(|mut s| {
767 thread::spawn(move || -> std::io::Result<Vec<u8>> {
768 let mut buf = Vec::new();
769 s.read_to_end(&mut buf)?;
770 Ok(buf)
771 })
772 });
773 let stderr_thread = stderr_handle.map(|mut s| {
774 thread::spawn(move || -> std::io::Result<Vec<u8>> {
775 let mut buf = Vec::new();
776 s.read_to_end(&mut buf)?;
777 Ok(buf)
778 })
779 });
780
781 use wait_timeout::ChildExt;
782 let (status, timed_out) = match child
783 .wait_timeout(budget)
784 .map_err(|source| DispatchError::Wait { source })?
785 {
786 Some(s) => (s, false),
787 None => {
788 kill_process_tree(child_pid);
794 let _ = child.kill();
795 let s = child
796 .wait()
797 .map_err(|source| DispatchError::Wait { source })?;
798 (s, true)
799 }
800 };
801
802 let stdout = stdout_thread
803 .map(|h| {
804 h.join()
805 .unwrap_or_else(|_| Ok(Vec::new()))
806 .unwrap_or_default()
807 })
808 .unwrap_or_default();
809 let stderr = stderr_thread
810 .map(|h| {
811 h.join()
812 .unwrap_or_else(|_| Ok(Vec::new()))
813 .unwrap_or_default()
814 })
815 .unwrap_or_default();
816
817 if let Some(cap) = capture_stdout_at {
818 std::fs::write(cap, &stdout).map_err(|source| DispatchError::Io {
819 path: cap.to_path_buf(),
820 source,
821 })?;
822 }
823 Ok(RawExit {
824 status,
825 stdout,
826 stderr,
827 timed_out,
828 })
829}
830
831fn outcome_from_exit(raw: RawExit) -> NodeOutcome {
832 if raw.timed_out {
833 return NodeOutcome::Failed {
834 kind: ErrorKind::Timeout,
835 message: Some(format!(
836 "subprocess killed after exceeding wall-clock budget; stderr: {}",
837 String::from_utf8_lossy(&tail_bytes(&raw.stderr, 1024)).into_owned()
838 )),
839 };
840 }
841 if raw.status.success() {
842 return NodeOutcome::Completed;
843 }
844 NodeOutcome::Failed {
845 kind: ErrorKind::Crash,
846 message: Some(
847 String::from_utf8_lossy(&tail_bytes(&raw.stderr, 1024))
848 .trim()
849 .to_string(),
850 ),
851 }
852}
853
854fn run_gate(
857 node: &Node,
858 gate: &str,
859 ctx: &ExecutorContext<'_>,
860) -> Result<NodeOutcome, DispatchError> {
861 let script = gate_script_path(ctx.volume_root, gate);
862 if !script.is_file() {
867 return Ok(NodeOutcome::Failed {
868 kind: ErrorKind::GateFailed,
869 message: Some(format!(
870 "gate hook {} missing at dispatch time",
871 script.display()
872 )),
873 });
874 }
875 if let Some(outcome) = enforce_gate_boundary(&script, ctx.volume_root, gate)? {
881 return Ok(outcome);
882 }
883 let mut cmd = gate_command(&script);
888 cmd.current_dir(ctx.volume_root)
889 .stdin(Stdio::null())
890 .stdout(Stdio::piped())
891 .stderr(Stdio::piped())
892 .env("OMNE_RUN_ID", ctx.run_id)
893 .env("OMNE_NODE_ID", &node.id)
894 .env("OMNE_GATE_NAME", gate)
895 .env("OMNE_VOLUME_ROOT", ctx.volume_root);
896
897 let raw = run_command_with_timeout(&mut cmd, ctx.gate_timeout, None)?;
898 if raw.timed_out {
899 return Ok(NodeOutcome::Failed {
900 kind: ErrorKind::GateTimeout,
901 message: Some(format!(
902 "gate {gate} exceeded {}s budget",
903 ctx.gate_timeout.as_secs()
904 )),
905 });
906 }
907 if !raw.status.success() {
908 return Ok(NodeOutcome::Failed {
909 kind: ErrorKind::GateFailed,
910 message: Some(
911 String::from_utf8_lossy(&tail_bytes(&raw.stderr, 1024))
912 .trim()
913 .to_string(),
914 ),
915 });
916 }
917
918 let stdout_tail = if raw.stdout.is_empty() {
919 None
920 } else {
921 let trimmed = String::from_utf8_lossy(&tail_bytes(&raw.stdout, 1024))
922 .trim()
923 .to_string();
924 (!trimmed.is_empty()).then_some(trimmed)
925 };
926
927 ctx.event_log
928 .append(&Event::GatePassed(GatePassed {
929 id: new_event_id(),
930 ts: iso_utc_now(),
931 run_id: ctx.run_id.to_string(),
932 node_id: node.id.clone(),
933 gate: gate.to_string(),
934 method: GateMethod::Hook,
935 stdout: stdout_tail,
936 }))
937 .map_err(DispatchError::from)?;
938 Ok(NodeOutcome::Completed)
939}
940
941fn enforce_gate_boundary(
952 script: &Path,
953 volume_root: &Path,
954 gate: &str,
955) -> Result<Option<NodeOutcome>, DispatchError> {
956 let hooks_dir = volume::dist_dir(volume_root).join("hooks");
957 let canonical_hooks = match hooks_dir.canonicalize() {
958 Ok(p) => p,
959 Err(_) => return Ok(None),
960 };
961 let canonical_script = script.canonicalize().map_err(|source| DispatchError::Io {
962 path: script.to_path_buf(),
963 source,
964 })?;
965 if !canonical_script.starts_with(&canonical_hooks) {
966 return Ok(Some(NodeOutcome::Failed {
967 kind: ErrorKind::GateFailed,
968 message: Some(format!(
969 "gate `{gate}` resolves outside {}: {}",
970 canonical_hooks.display(),
971 canonical_script.display()
972 )),
973 }));
974 }
975 Ok(None)
976}
977
978fn gate_script_path(volume_root: &Path, gate: &str) -> PathBuf {
979 volume::dist_dir(volume_root)
980 .join("hooks")
981 .join(format!("{gate}.{}", platform_hook_extension()))
982}
983
984#[cfg(windows)]
985fn gate_command(script: &Path) -> Command {
986 let mut cmd = Command::new("powershell");
987 cmd.arg("-NoProfile").arg("-File").arg(script);
993 cmd
994}
995
996#[cfg(not(windows))]
997fn gate_command(script: &Path) -> Command {
998 let mut cmd = Command::new("sh");
999 cmd.arg(script);
1000 cmd
1001}
1002
1003#[cfg(windows)]
1004fn platform_hook_extension() -> &'static str {
1005 "ps1"
1006}
1007
1008#[cfg(not(windows))]
1009fn platform_hook_extension() -> &'static str {
1010 "sh"
1011}
1012
1013fn emit_terminal(
1016 node: &Node,
1017 outcome: &NodeOutcome,
1018 ctx: &ExecutorContext<'_>,
1019) -> Result<(), DispatchError> {
1020 let event = match outcome {
1021 NodeOutcome::Completed => Event::NodeCompleted(NodeCompleted {
1022 id: new_event_id(),
1023 ts: iso_utc_now(),
1024 run_id: ctx.run_id.to_string(),
1025 node_id: node.id.clone(),
1026 output_path: capture_output_path_wire(ctx.run_id, &node.id),
1027 }),
1028 NodeOutcome::Failed { kind, message } => Event::NodeFailed(NodeFailed {
1029 id: new_event_id(),
1030 ts: iso_utc_now(),
1031 run_id: ctx.run_id.to_string(),
1032 node_id: node.id.clone(),
1033 error: NodeError { kind: *kind },
1034 message: message.clone(),
1035 }),
1036 };
1037 ctx.event_log.append(&event).map_err(DispatchError::from)?;
1038 Ok(())
1039}
1040
1041fn node_capture_path(ctx: &ExecutorContext<'_>, node_id: &str) -> PathBuf {
1044 volume::nodes_dir(ctx.volume_root, ctx.run_id).join(format!("{node_id}.out"))
1045}
1046
1047fn capture_output_path_wire(run_id: &str, node_id: &str) -> String {
1052 volume::node_capture_wire_path(run_id, node_id)
1053}
1054
1055fn ensure_parent_dir(path: &Path) -> Result<(), DispatchError> {
1056 if let Some(parent) = path.parent() {
1057 if !parent.as_os_str().is_empty() {
1058 std::fs::create_dir_all(parent).map_err(|source| DispatchError::Io {
1059 path: parent.to_path_buf(),
1060 source,
1061 })?;
1062 }
1063 }
1064 Ok(())
1065}
1066
1067fn node_timeout(node: &Node, ctx: &ExecutorContext<'_>) -> Duration {
1068 node.timeout
1069 .map(Duration::from_secs)
1070 .unwrap_or(ctx.default_node_timeout)
1071}
1072
1073fn new_event_id() -> String {
1074 Ulid::new().to_string().to_lowercase()
1075}
1076
1077fn iso_utc_now() -> String {
1081 crate::clock::now_utc().format_iso_utc()
1082}
1083
1084fn tail_bytes(b: &[u8], max_bytes: usize) -> Vec<u8> {
1090 if b.len() <= max_bytes {
1091 return b.to_vec();
1092 }
1093 let mut start = b.len() - max_bytes;
1094 while start < b.len() && (b[start] & 0b1100_0000) == 0b1000_0000 {
1098 start += 1;
1099 }
1100 b[start..].to_vec()
1101}
1102
1103fn tail(s: &str, max_bytes: usize) -> String {
1106 let bytes = tail_bytes(s.as_bytes(), max_bytes);
1107 String::from_utf8_lossy(&bytes).trim().to_string()
1108}
1109
1110#[derive(Debug, thiserror::Error)]
1118pub enum DispatchError {
1119 #[error("node `{node_id}` rejected by executor: {reason}")]
1120 InvalidNode { node_id: String, reason: String },
1121
1122 #[error("event log error: {0}")]
1123 EventLog(#[from] crate::event_log::Error),
1124
1125 #[error("claude subprocess error: {0}")]
1126 ClaudeProc(#[from] claude_proc::Error),
1127
1128 #[error("I/O error on {}: {source}", path.display())]
1133 Io {
1134 path: PathBuf,
1135 #[source]
1136 source: std::io::Error,
1137 },
1138
1139 #[error("failed to spawn subprocess: {source}")]
1144 Spawn {
1145 #[source]
1146 source: std::io::Error,
1147 },
1148
1149 #[error("failed to wait on subprocess: {source}")]
1154 Wait {
1155 #[source]
1156 source: std::io::Error,
1157 },
1158}
1159
1160#[cfg(test)]
1161mod tests {
1162 use super::*;
1163
1164 #[test]
1165 fn iso_utc_now_is_plausible_shape() {
1166 let s = iso_utc_now();
1167 assert_eq!(s.len(), 20);
1168 assert!(s.ends_with('Z'));
1169 assert!(s.contains('T'));
1170 }
1171
1172 #[test]
1173 fn tail_trims_and_bounds() {
1174 assert_eq!(tail(" hi ", 1024), "hi");
1175 let long: String = "x".repeat(5000);
1176 assert_eq!(tail(&long, 10).len(), 10);
1177 }
1178
1179 #[test]
1180 fn output_path_wire_uses_forward_slashes() {
1181 let wire = capture_output_path_wire("feature-01abc", "research");
1182 assert!(!wire.contains('\\'));
1183 assert_eq!(wire, ".omne/var/runs/feature-01abc/nodes/research.out");
1184 }
1185}