#![allow(dead_code)]
use std::io::{Read, Write};
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use std::sync::mpsc;
use std::thread;
use std::time::{Duration, Instant};
use ulid::Ulid;
use uuid::Uuid;
use crate::claude_proc::{self, CaptureMode, ChildKiller, ClaudeProcess, Session, SpawnOpts};
use crate::event_log::EventLog;
use crate::events::{
ErrorKind, Event, GateMethod, GatePassed, Input, IterationStarted, NodeCompleted, NodeError,
NodeFailed, NodeKind, NodeStarted,
};
use crate::pipe::{ExecutionKind, LoopBody, Node};
use crate::sentinel::{self, Scanner};
use crate::volume;
pub const DEFAULT_NODE_TIMEOUT: Duration = Duration::from_secs(1800);
pub const GATE_TIMEOUT: Duration = Duration::from_secs(60);
pub struct ExecutorContext<'a> {
pub volume_root: &'a Path,
pub run_id: &'a str,
pub worktree: &'a Path,
pub event_log: &'a EventLog,
pub inputs: &'a [Input],
pub default_model: Option<&'a str>,
pub claude_bin: Option<&'a Path>,
pub default_node_timeout: Duration,
pub gate_timeout: Duration,
}
impl<'a> ExecutorContext<'a> {
pub fn new(
volume_root: &'a Path,
run_id: &'a str,
worktree: &'a Path,
event_log: &'a EventLog,
inputs: &'a [Input],
) -> Self {
Self {
volume_root,
run_id,
worktree,
event_log,
inputs,
default_model: None,
claude_bin: None,
default_node_timeout: DEFAULT_NODE_TIMEOUT,
gate_timeout: GATE_TIMEOUT,
}
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum NodeOutcome {
Completed,
Failed {
kind: ErrorKind,
message: Option<String>,
},
}
pub fn dispatch(node: &Node, ctx: &ExecutorContext<'_>) -> Result<NodeOutcome, DispatchError> {
let kind = node
.execution_kind()
.ok_or_else(|| DispatchError::InvalidNode {
node_id: node.id.clone(),
reason: "node has no execution kind (validator should have caught this)".into(),
})?;
let event_kind = match kind {
ExecutionKind::Command => NodeKind::Command,
ExecutionKind::Prompt => NodeKind::Prompt,
ExecutionKind::Bash => NodeKind::Bash,
ExecutionKind::Loop => NodeKind::Loop,
};
ctx.event_log
.append(&Event::NodeStarted(NodeStarted {
id: new_event_id(),
ts: iso_utc_now(),
run_id: ctx.run_id.to_string(),
node_id: node.id.clone(),
kind: event_kind,
name: None,
model: effective_model(node, ctx).map(str::to_string),
}))
.map_err(DispatchError::from)?;
let final_outcome = body_with_gate(node, kind, ctx);
emit_terminal(node, &final_outcome, ctx)?;
Ok(final_outcome)
}
fn body_with_gate(node: &Node, kind: ExecutionKind, ctx: &ExecutorContext<'_>) -> NodeOutcome {
let body_outcome = match kind {
ExecutionKind::Bash => run_bash(node, ctx),
ExecutionKind::Command | ExecutionKind::Prompt => run_ai(node, ctx),
ExecutionKind::Loop => run_loop(node, ctx),
};
let body_outcome = match body_outcome {
Ok(o) => o,
Err(e) => {
return NodeOutcome::Failed {
kind: ErrorKind::Crash,
message: Some(format!("dispatch error in node body: {e}")),
}
}
};
match body_outcome {
NodeOutcome::Completed => match node.gate.as_deref() {
Some(gate) => match run_gate(node, gate, ctx) {
Ok(o) => o,
Err(e) => NodeOutcome::Failed {
kind: ErrorKind::GateFailed,
message: Some(format!("dispatch error in gate: {e}")),
},
},
None => NodeOutcome::Completed,
},
other => other,
}
}
fn run_bash(node: &Node, ctx: &ExecutorContext<'_>) -> Result<NodeOutcome, DispatchError> {
let bash_body = node
.bash
.as_deref()
.ok_or_else(|| DispatchError::InvalidNode {
node_id: node.id.clone(),
reason: "bash node missing bash: body".into(),
})?;
let capture_path = node_capture_path(ctx, &node.id);
ensure_parent_dir(&capture_path)?;
let mut cmd = bash_command(bash_body);
cmd.current_dir(ctx.worktree)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped());
for input in ctx.inputs {
let env_key = format!("OMNE_INPUT_{}", input.key.to_uppercase());
cmd.env(env_key, &input.value);
}
let budget = node_timeout(node, ctx);
let outcome = run_command_with_timeout(&mut cmd, budget, Some(&capture_path))?;
Ok(outcome_from_exit(outcome))
}
#[cfg(windows)]
fn bash_command(body: &str) -> Command {
use std::os::windows::process::CommandExt;
let mut cmd = Command::new("cmd");
cmd.raw_arg(format!("/S /C \"{body}\""));
cmd
}
#[cfg(not(windows))]
fn bash_command(body: &str) -> Command {
let mut cmd = Command::new("sh");
cmd.arg("-c").arg(body);
cmd
}
fn run_ai(node: &Node, ctx: &ExecutorContext<'_>) -> Result<NodeOutcome, DispatchError> {
let prompt = ai_prompt_for(node)?;
let capture_path = node_capture_path(ctx, &node.id);
ensure_parent_dir(&capture_path)?;
let opts = build_spawn_opts(node, ctx, prompt, None);
let budget = node_timeout(node, ctx);
let outcome = run_claude_iteration(&opts, &capture_path, CaptureMode::Truncate, budget, &[])?;
Ok(match outcome {
ClaudeOutcome::CleanExit => NodeOutcome::Completed,
ClaudeOutcome::SentinelHit { token: _ } => {
NodeOutcome::Failed {
kind: ErrorKind::Blocked,
message: Some("assistant emitted BLOCKED".into()),
}
}
other => claude_failure(other, node, budget),
})
}
fn claude_failure(outcome: ClaudeOutcome, node: &Node, budget: Duration) -> NodeOutcome {
match outcome {
ClaudeOutcome::CleanExit | ClaudeOutcome::SentinelHit { .. } => {
unreachable!("claude_failure invoked on a non-failure outcome");
}
ClaudeOutcome::Timeout => NodeOutcome::Failed {
kind: ErrorKind::Timeout,
message: Some(format!(
"node {} exceeded timeout of {}s",
node.id,
budget.as_secs()
)),
},
ClaudeOutcome::HostMissing => NodeOutcome::Failed {
kind: ErrorKind::HostMissing,
message: Some("claude binary not found on PATH".into()),
},
ClaudeOutcome::Crash { stderr_tail } => NodeOutcome::Failed {
kind: ErrorKind::Crash,
message: Some(stderr_tail),
},
}
}
fn ai_prompt_for(node: &Node) -> Result<String, DispatchError> {
if let Some(prompt) = &node.prompt {
return Ok(prompt.clone());
}
if let Some(command) = &node.command {
return Ok(format!("/{command}"));
}
if node.loop_.is_some() {
return Err(DispatchError::InvalidNode {
node_id: node.id.clone(),
reason: "loop body should route through run_loop, not run_ai".into(),
});
}
Err(DispatchError::InvalidNode {
node_id: node.id.clone(),
reason: "AI node carried neither prompt: nor command:".into(),
})
}
fn run_loop(node: &Node, ctx: &ExecutorContext<'_>) -> Result<NodeOutcome, DispatchError> {
let body = node
.loop_
.as_ref()
.ok_or_else(|| DispatchError::InvalidNode {
node_id: node.id.clone(),
reason: "loop dispatch on a non-loop node".into(),
})?;
let prompt = loop_body_prompt(node, body)?;
let capture_path = node_capture_path(ctx, &node.id);
ensure_parent_dir(&capture_path)?;
let session_uuid = if !body.fresh_context {
Some(Uuid::new_v4().to_string())
} else {
None
};
let until_tokens: Vec<String> = vec![body.until.clone()];
let budget = node_timeout(node, ctx);
let deadline = Instant::now() + budget;
for iter in 1..=body.max_iterations {
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
return Ok(NodeOutcome::Failed {
kind: ErrorKind::Timeout,
message: Some(format!(
"loop {} exceeded timeout of {}s",
node.id,
budget.as_secs()
)),
});
}
if iter == 1 {
truncate_capture(&capture_path)?;
}
let byte_offset = write_iteration_marker(&capture_path, iter)?;
ctx.event_log
.append(&Event::IterationStarted(IterationStarted {
id: new_event_id(),
ts: iso_utc_now(),
run_id: ctx.run_id.to_string(),
node_id: node.id.clone(),
iteration: iter,
byte_offset,
}))
.map_err(DispatchError::from)?;
let session = session_uuid.as_ref().map(|uuid| {
if iter == 1 {
Session::New(uuid.clone())
} else {
Session::Resume(uuid.clone())
}
});
let opts = build_spawn_opts(node, ctx, prompt.clone(), session);
let outcome = run_claude_iteration(
&opts,
&capture_path,
CaptureMode::Append,
remaining,
&until_tokens,
)?;
match outcome {
ClaudeOutcome::SentinelHit { token } => {
if token == sentinel::BLOCKED {
return Ok(NodeOutcome::Failed {
kind: ErrorKind::Blocked,
message: Some(format!(
"loop {} assistant emitted BLOCKED on iteration {iter}",
node.id
)),
});
}
if token == body.until {
return Ok(NodeOutcome::Completed);
}
return Err(DispatchError::InvalidNode {
node_id: node.id.clone(),
reason: format!("scanner returned unexpected token `{token}`"),
});
}
ClaudeOutcome::CleanExit => {
}
failure => return Ok(claude_failure(failure, node, budget)),
}
}
Ok(NodeOutcome::Failed {
kind: ErrorKind::MaxIterationsExceeded,
message: Some(format!(
"loop {} exhausted {} iterations without matching `{}`",
node.id, body.max_iterations, body.until
)),
})
}
fn loop_body_prompt(node: &Node, body: &LoopBody) -> Result<String, DispatchError> {
if let Some(prompt) = &body.prompt {
return Ok(prompt.clone());
}
if let Some(command) = &body.command {
return Ok(format!("/{command}"));
}
Err(DispatchError::InvalidNode {
node_id: node.id.clone(),
reason: "loop body carries neither prompt: nor command:".into(),
})
}
const ITERATION_MARKER_PREFIX: &str = "=== omne:iteration:";
fn write_iteration_marker(capture_path: &Path, iter: u32) -> Result<u64, DispatchError> {
let mut f = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(capture_path)
.map_err(|source| DispatchError::Io {
path: capture_path.to_path_buf(),
source,
})?;
writeln!(f, "\n{ITERATION_MARKER_PREFIX}{iter} ===").map_err(|source| DispatchError::Io {
path: capture_path.to_path_buf(),
source,
})?;
f.sync_data().map_err(|source| DispatchError::Io {
path: capture_path.to_path_buf(),
source,
})?;
let meta = std::fs::metadata(capture_path).map_err(|source| DispatchError::Io {
path: capture_path.to_path_buf(),
source,
})?;
Ok(meta.len())
}
fn truncate_capture(capture_path: &Path) -> Result<(), DispatchError> {
std::fs::OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(capture_path)
.map(|_| ())
.map_err(|source| DispatchError::Io {
path: capture_path.to_path_buf(),
source,
})
}
enum ClaudeOutcome {
CleanExit,
SentinelHit { token: String },
Timeout,
HostMissing,
Crash { stderr_tail: String },
}
fn run_claude_iteration(
opts: &SpawnOpts,
capture_path: &Path,
capture_mode: CaptureMode,
budget: Duration,
until_tokens: &[String],
) -> Result<ClaudeOutcome, DispatchError> {
let child = match claude_proc::spawn(opts) {
Ok(c) => c,
Err(claude_proc::Error::HostMissing) => return Ok(ClaudeOutcome::HostMissing),
Err(other) => return Err(DispatchError::from(other)),
};
let proc = ClaudeProcess::from_child_with_mode(child, capture_path, capture_mode)?;
let killer = proc.killer();
let (cancel_tx, cancel_rx) = mpsc::channel::<()>();
let watchdog = spawn_watchdog(killer.clone(), cancel_rx, budget);
let scanner = Scanner::new(until_tokens);
let mut proc = proc;
let mut hit: Option<String> = None;
let mut stream_error: Option<claude_proc::Error> = None;
for line in proc.by_ref() {
match line {
Ok(al) => {
if let Some(h) = scanner.feed(&al.text) {
hit = Some(h.token);
let _ = killer.kill();
break;
}
}
Err(e) => {
stream_error = Some(e);
break;
}
}
}
drop(cancel_tx);
let timed_out = watchdog.join().unwrap_or(false);
let (status, stderr) = match proc.finish() {
Ok(s) => s,
Err(e) => {
return Err(DispatchError::from(e));
}
};
if let Some(token) = hit {
return Ok(ClaudeOutcome::SentinelHit { token });
}
if timed_out {
return Ok(ClaudeOutcome::Timeout);
}
if let Some(e) = stream_error {
return Ok(ClaudeOutcome::Crash {
stderr_tail: format!("stream error: {e}\nstderr: {}", tail(&stderr, 1024)),
});
}
if !status.success() {
return Ok(ClaudeOutcome::Crash {
stderr_tail: tail(&stderr, 1024),
});
}
Ok(ClaudeOutcome::CleanExit)
}
fn spawn_watchdog(
killer: ChildKiller,
rx: mpsc::Receiver<()>,
budget: Duration,
) -> thread::JoinHandle<bool> {
thread::spawn(move || match rx.recv_timeout(budget) {
Err(mpsc::RecvTimeoutError::Disconnected) => false,
Err(mpsc::RecvTimeoutError::Timeout) => {
let _ = killer.kill();
true
}
Ok(()) => {
false
}
})
}
fn build_spawn_opts(
node: &Node,
ctx: &ExecutorContext<'_>,
prompt: String,
session: Option<Session>,
) -> SpawnOpts {
SpawnOpts {
prompt,
cwd: ctx.worktree.to_path_buf(),
model: effective_model(node, ctx).map(str::to_string),
allowed_tools: node.allowed_tools.clone(),
session,
extra_args: Vec::new(),
env_vars: build_ai_env_vars(node, ctx),
bin: ctx.claude_bin.map(|p| p.to_path_buf()),
}
}
fn build_ai_env_vars(node: &Node, ctx: &ExecutorContext<'_>) -> Vec<(String, String)> {
let mut env = Vec::with_capacity(3 + ctx.inputs.len());
env.push(("OMNE_RUN_ID".to_string(), ctx.run_id.to_string()));
env.push(("OMNE_NODE_ID".to_string(), node.id.clone()));
env.push((
"OMNE_VOLUME_ROOT".to_string(),
ctx.volume_root.to_string_lossy().into_owned(),
));
for input in ctx.inputs {
env.push((
format!("OMNE_INPUT_{}", input.key.to_uppercase()),
input.value.clone(),
));
}
env
}
fn effective_model<'a>(node: &'a Node, ctx: &'a ExecutorContext<'a>) -> Option<&'a str> {
node.model
.as_deref()
.or_else(|| ctx.default_model.map(|s| s as &str))
}
struct RawExit {
status: std::process::ExitStatus,
stdout: Vec<u8>,
stderr: Vec<u8>,
timed_out: bool,
}
#[cfg(unix)]
fn set_new_process_group(cmd: &mut Command) {
use std::os::unix::process::CommandExt;
cmd.process_group(0);
}
#[cfg(windows)]
fn set_new_process_group(cmd: &mut Command) {
use std::os::windows::process::CommandExt;
const CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
cmd.creation_flags(CREATE_NEW_PROCESS_GROUP);
}
#[cfg(unix)]
fn kill_process_tree(pid: u32) {
let _ = Command::new("kill")
.args(["-KILL", "--", &format!("-{pid}")])
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.status();
}
#[cfg(windows)]
fn kill_process_tree(pid: u32) {
let _ = Command::new("taskkill")
.args(["/T", "/F", "/PID", &pid.to_string()])
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.status();
}
fn run_command_with_timeout(
cmd: &mut Command,
budget: Duration,
capture_stdout_at: Option<&Path>,
) -> Result<RawExit, DispatchError> {
set_new_process_group(cmd);
let mut child = cmd
.spawn()
.map_err(|source| DispatchError::Spawn { source })?;
let child_pid = child.id();
let stdout_handle = child.stdout.take();
let stderr_handle = child.stderr.take();
let stdout_thread = stdout_handle.map(|mut s| {
thread::spawn(move || -> std::io::Result<Vec<u8>> {
let mut buf = Vec::new();
s.read_to_end(&mut buf)?;
Ok(buf)
})
});
let stderr_thread = stderr_handle.map(|mut s| {
thread::spawn(move || -> std::io::Result<Vec<u8>> {
let mut buf = Vec::new();
s.read_to_end(&mut buf)?;
Ok(buf)
})
});
use wait_timeout::ChildExt;
let (status, timed_out) = match child
.wait_timeout(budget)
.map_err(|source| DispatchError::Wait { source })?
{
Some(s) => (s, false),
None => {
kill_process_tree(child_pid);
let _ = child.kill();
let s = child
.wait()
.map_err(|source| DispatchError::Wait { source })?;
(s, true)
}
};
let stdout = stdout_thread
.map(|h| {
h.join()
.unwrap_or_else(|_| Ok(Vec::new()))
.unwrap_or_default()
})
.unwrap_or_default();
let stderr = stderr_thread
.map(|h| {
h.join()
.unwrap_or_else(|_| Ok(Vec::new()))
.unwrap_or_default()
})
.unwrap_or_default();
if let Some(cap) = capture_stdout_at {
std::fs::write(cap, &stdout).map_err(|source| DispatchError::Io {
path: cap.to_path_buf(),
source,
})?;
}
Ok(RawExit {
status,
stdout,
stderr,
timed_out,
})
}
fn outcome_from_exit(raw: RawExit) -> NodeOutcome {
if raw.timed_out {
return NodeOutcome::Failed {
kind: ErrorKind::Timeout,
message: Some(format!(
"subprocess killed after exceeding wall-clock budget; stderr: {}",
String::from_utf8_lossy(&tail_bytes(&raw.stderr, 1024)).into_owned()
)),
};
}
if raw.status.success() {
return NodeOutcome::Completed;
}
NodeOutcome::Failed {
kind: ErrorKind::Crash,
message: Some(
String::from_utf8_lossy(&tail_bytes(&raw.stderr, 1024))
.trim()
.to_string(),
),
}
}
fn run_gate(
node: &Node,
gate: &str,
ctx: &ExecutorContext<'_>,
) -> Result<NodeOutcome, DispatchError> {
let script = gate_script_path(ctx.volume_root, gate);
if !script.is_file() {
return Ok(NodeOutcome::Failed {
kind: ErrorKind::GateFailed,
message: Some(format!(
"gate hook {} missing at dispatch time",
script.display()
)),
});
}
if let Some(outcome) = enforce_gate_boundary(&script, ctx.volume_root, gate)? {
return Ok(outcome);
}
let mut cmd = gate_command(&script);
cmd.current_dir(ctx.volume_root)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.env("OMNE_RUN_ID", ctx.run_id)
.env("OMNE_NODE_ID", &node.id)
.env("OMNE_GATE_NAME", gate)
.env("OMNE_VOLUME_ROOT", ctx.volume_root);
let raw = run_command_with_timeout(&mut cmd, ctx.gate_timeout, None)?;
if raw.timed_out {
return Ok(NodeOutcome::Failed {
kind: ErrorKind::GateTimeout,
message: Some(format!(
"gate {gate} exceeded {}s budget",
ctx.gate_timeout.as_secs()
)),
});
}
if !raw.status.success() {
return Ok(NodeOutcome::Failed {
kind: ErrorKind::GateFailed,
message: Some(
String::from_utf8_lossy(&tail_bytes(&raw.stderr, 1024))
.trim()
.to_string(),
),
});
}
let stdout_tail = if raw.stdout.is_empty() {
None
} else {
let trimmed = String::from_utf8_lossy(&tail_bytes(&raw.stdout, 1024))
.trim()
.to_string();
(!trimmed.is_empty()).then_some(trimmed)
};
ctx.event_log
.append(&Event::GatePassed(GatePassed {
id: new_event_id(),
ts: iso_utc_now(),
run_id: ctx.run_id.to_string(),
node_id: node.id.clone(),
gate: gate.to_string(),
method: GateMethod::Hook,
stdout: stdout_tail,
}))
.map_err(DispatchError::from)?;
Ok(NodeOutcome::Completed)
}
fn enforce_gate_boundary(
script: &Path,
volume_root: &Path,
gate: &str,
) -> Result<Option<NodeOutcome>, DispatchError> {
let hooks_dir = volume::dist_dir(volume_root).join("hooks");
let canonical_hooks = match hooks_dir.canonicalize() {
Ok(p) => p,
Err(_) => return Ok(None),
};
let canonical_script = script.canonicalize().map_err(|source| DispatchError::Io {
path: script.to_path_buf(),
source,
})?;
if !canonical_script.starts_with(&canonical_hooks) {
return Ok(Some(NodeOutcome::Failed {
kind: ErrorKind::GateFailed,
message: Some(format!(
"gate `{gate}` resolves outside {}: {}",
canonical_hooks.display(),
canonical_script.display()
)),
}));
}
Ok(None)
}
fn gate_script_path(volume_root: &Path, gate: &str) -> PathBuf {
volume::dist_dir(volume_root)
.join("hooks")
.join(format!("{gate}.{}", platform_hook_extension()))
}
#[cfg(windows)]
fn gate_command(script: &Path) -> Command {
let mut cmd = Command::new("powershell");
cmd.arg("-NoProfile").arg("-File").arg(script);
cmd
}
#[cfg(not(windows))]
fn gate_command(script: &Path) -> Command {
let mut cmd = Command::new("sh");
cmd.arg(script);
cmd
}
#[cfg(windows)]
fn platform_hook_extension() -> &'static str {
"ps1"
}
#[cfg(not(windows))]
fn platform_hook_extension() -> &'static str {
"sh"
}
fn emit_terminal(
node: &Node,
outcome: &NodeOutcome,
ctx: &ExecutorContext<'_>,
) -> Result<(), DispatchError> {
let event = match outcome {
NodeOutcome::Completed => Event::NodeCompleted(NodeCompleted {
id: new_event_id(),
ts: iso_utc_now(),
run_id: ctx.run_id.to_string(),
node_id: node.id.clone(),
output_path: capture_output_path_wire(ctx.run_id, &node.id),
}),
NodeOutcome::Failed { kind, message } => Event::NodeFailed(NodeFailed {
id: new_event_id(),
ts: iso_utc_now(),
run_id: ctx.run_id.to_string(),
node_id: node.id.clone(),
error: NodeError { kind: *kind },
message: message.clone(),
}),
};
ctx.event_log.append(&event).map_err(DispatchError::from)?;
Ok(())
}
fn node_capture_path(ctx: &ExecutorContext<'_>, node_id: &str) -> PathBuf {
volume::nodes_dir(ctx.volume_root, ctx.run_id).join(format!("{node_id}.out"))
}
fn capture_output_path_wire(run_id: &str, node_id: &str) -> String {
volume::node_capture_wire_path(run_id, node_id)
}
fn ensure_parent_dir(path: &Path) -> Result<(), DispatchError> {
if let Some(parent) = path.parent() {
if !parent.as_os_str().is_empty() {
std::fs::create_dir_all(parent).map_err(|source| DispatchError::Io {
path: parent.to_path_buf(),
source,
})?;
}
}
Ok(())
}
fn node_timeout(node: &Node, ctx: &ExecutorContext<'_>) -> Duration {
node.timeout
.map(Duration::from_secs)
.unwrap_or(ctx.default_node_timeout)
}
fn new_event_id() -> String {
Ulid::new().to_string().to_lowercase()
}
fn iso_utc_now() -> String {
crate::clock::now_utc().format_iso_utc()
}
fn tail_bytes(b: &[u8], max_bytes: usize) -> Vec<u8> {
if b.len() <= max_bytes {
return b.to_vec();
}
let mut start = b.len() - max_bytes;
while start < b.len() && (b[start] & 0b1100_0000) == 0b1000_0000 {
start += 1;
}
b[start..].to_vec()
}
fn tail(s: &str, max_bytes: usize) -> String {
let bytes = tail_bytes(s.as_bytes(), max_bytes);
String::from_utf8_lossy(&bytes).trim().to_string()
}
#[derive(Debug, thiserror::Error)]
pub enum DispatchError {
#[error("node `{node_id}` rejected by executor: {reason}")]
InvalidNode { node_id: String, reason: String },
#[error("event log error: {0}")]
EventLog(#[from] crate::event_log::Error),
#[error("claude subprocess error: {0}")]
ClaudeProc(#[from] claude_proc::Error),
#[error("I/O error on {}: {source}", path.display())]
Io {
path: PathBuf,
#[source]
source: std::io::Error,
},
#[error("failed to spawn subprocess: {source}")]
Spawn {
#[source]
source: std::io::Error,
},
#[error("failed to wait on subprocess: {source}")]
Wait {
#[source]
source: std::io::Error,
},
}
#[cfg(test)]
mod tests {
use super::*;
use crate::event_log::EventLog;
use crate::events::Input;
use crate::pipe::{Node, TriggerRule};
use std::path::PathBuf;
use std::time::Duration;
use tempfile::TempDir;
#[test]
fn iso_utc_now_is_plausible_shape() {
let s = iso_utc_now();
assert_eq!(s.len(), 20);
assert!(s.ends_with('Z'));
assert!(s.contains('T'));
}
#[test]
fn tail_trims_and_bounds() {
assert_eq!(tail(" hi ", 1024), "hi");
let long: String = "x".repeat(5000);
assert_eq!(tail(&long, 10).len(), 10);
}
#[test]
fn output_path_wire_uses_forward_slashes() {
let wire = capture_output_path_wire("feature-01abc", "research");
assert!(!wire.contains('\\'));
assert_eq!(wire, ".omne/var/runs/feature-01abc/nodes/research.out");
}
fn bare_ai_node(id: &str) -> Node {
Node {
id: id.into(),
depends_on: vec![],
model: None,
allowed_tools: vec![],
gate: None,
timeout: None,
trigger_rule: TriggerRule::AllSuccess,
command: Some("plan".into()),
prompt: None,
bash: None,
loop_: None,
}
}
#[test]
fn build_ai_env_vars_contains_base_and_input_vars() {
let tmp = TempDir::new().unwrap();
let log = EventLog::for_run(tmp.path(), "feature-01abc").unwrap();
let volume_root: PathBuf = tmp.path().to_path_buf();
let worktree: PathBuf = tmp.path().join(".omne/wt/feature-01abc");
let inputs = vec![
Input {
key: "feature_name".into(),
value: "add-hello".into(),
},
Input {
key: "scope".into(),
value: "src/api".into(),
},
];
let ctx = ExecutorContext {
volume_root: &volume_root,
run_id: "feature-01abc",
worktree: &worktree,
event_log: &log,
inputs: &inputs,
default_model: None,
claude_bin: None,
default_node_timeout: Duration::from_secs(60),
gate_timeout: Duration::from_secs(60),
};
let node = bare_ai_node("plan");
let env = build_ai_env_vars(&node, &ctx);
let lookup = |k: &str| env.iter().find(|(key, _)| key == k).map(|(_, v)| v.clone());
assert_eq!(lookup("OMNE_RUN_ID"), Some("feature-01abc".to_string()));
assert_eq!(lookup("OMNE_NODE_ID"), Some("plan".to_string()));
assert_eq!(
lookup("OMNE_VOLUME_ROOT"),
Some(volume_root.to_string_lossy().into_owned())
);
assert_eq!(
lookup("OMNE_INPUT_FEATURE_NAME"),
Some("add-hello".to_string())
);
assert_eq!(lookup("OMNE_INPUT_SCOPE"), Some("src/api".to_string()));
}
#[test]
fn build_ai_env_vars_no_inputs_still_has_base_vars() {
let tmp = TempDir::new().unwrap();
let log = EventLog::for_run(tmp.path(), "feature-01xyz").unwrap();
let volume_root: PathBuf = tmp.path().to_path_buf();
let worktree: PathBuf = tmp.path().join(".omne/wt/feature-01xyz");
let ctx = ExecutorContext {
volume_root: &volume_root,
run_id: "feature-01xyz",
worktree: &worktree,
event_log: &log,
inputs: &[],
default_model: None,
claude_bin: None,
default_node_timeout: Duration::from_secs(60),
gate_timeout: Duration::from_secs(60),
};
let node = bare_ai_node("research");
let env = build_ai_env_vars(&node, &ctx);
assert_eq!(env.len(), 3);
assert!(env.iter().all(|(k, _)| !k.starts_with("OMNE_INPUT_")));
}
}