#![allow(dead_code)]
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::path::{Path, PathBuf};
use serde::Deserialize;
use thiserror::Error;
use crate::manifest;
use crate::volume;
#[derive(Debug, Clone, Deserialize)]
pub struct Pipe {
pub name: String,
pub version: u32,
#[serde(default)]
pub default_model: Option<String>,
#[serde(default)]
pub inputs: BTreeMap<String, InputSpec>,
pub nodes: Vec<Node>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct InputSpec {
#[serde(rename = "type")]
pub kind: String,
#[serde(default)]
pub required: bool,
#[serde(default)]
pub default: Option<String>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct Node {
pub id: String,
#[serde(default)]
pub depends_on: Vec<String>,
#[serde(default)]
pub model: Option<String>,
#[serde(default)]
pub allowed_tools: Vec<String>,
#[serde(default)]
pub gate: Option<String>,
#[serde(default)]
pub timeout: Option<u64>,
#[serde(default)]
pub trigger_rule: TriggerRule,
#[serde(default)]
pub command: Option<String>,
#[serde(default)]
pub prompt: Option<String>,
#[serde(default)]
pub bash: Option<String>,
#[serde(default, rename = "loop")]
pub loop_: Option<LoopBody>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct LoopBody {
#[serde(default)]
pub command: Option<String>,
#[serde(default)]
pub prompt: Option<String>,
pub until: String,
pub max_iterations: u32,
#[serde(default)]
pub fresh_context: bool,
}
impl Pipe {
pub fn needs_claude(&self) -> bool {
self.nodes
.iter()
.any(|n| n.command.is_some() || n.prompt.is_some() || n.loop_.is_some())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum TriggerRule {
#[default]
AllSuccess,
OneSuccess,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ExecutionKind {
Command,
Prompt,
Bash,
Loop,
}
impl ExecutionKind {
pub fn as_str(self) -> &'static str {
match self {
ExecutionKind::Command => "command",
ExecutionKind::Prompt => "prompt",
ExecutionKind::Bash => "bash",
ExecutionKind::Loop => "loop",
}
}
}
impl Node {
pub fn execution_kind(&self) -> Option<ExecutionKind> {
let kinds = self.populated_kinds();
if kinds.len() == 1 {
Some(kinds[0])
} else {
None
}
}
fn populated_kinds(&self) -> Vec<ExecutionKind> {
let mut out = Vec::with_capacity(1);
if self.command.is_some() {
out.push(ExecutionKind::Command);
}
if self.prompt.is_some() {
out.push(ExecutionKind::Prompt);
}
if self.bash.is_some() {
out.push(ExecutionKind::Bash);
}
if self.loop_.is_some() {
out.push(ExecutionKind::Loop);
}
out
}
pub fn is_ai(&self) -> bool {
matches!(
self.execution_kind(),
Some(ExecutionKind::Command) | Some(ExecutionKind::Prompt) | Some(ExecutionKind::Loop)
)
}
}
#[derive(Debug, Error)]
pub enum ParseError {
#[error("pipe file is missing its `---...---` YAML frontmatter")]
MissingFrontmatter,
#[error("pipe YAML is malformed: {0}")]
Yaml(#[from] serde_yml::Error),
#[error("cannot read pipe file {path}: {source}")]
Io {
path: PathBuf,
#[source]
source: std::io::Error,
},
}
pub const RESERVED_BLOCKED: &str = "BLOCKED";
pub const MAX_ITERATIONS_CAP: u32 = 1000;
#[derive(Debug, Error)]
pub enum ValidationError {
#[error("pipe `{pipe}` has empty `nodes:` list")]
EmptyNodes { pipe: String },
#[error("pipe `{pipe}` declares duplicate node id `{id}`")]
DuplicateNodeId { pipe: String, id: String },
#[error("node `{node_id}` declares no execution kind (need exactly one of command/prompt/bash/loop)")]
NoExecutionKind { node_id: String },
#[error("node `{node_id}` declares multiple execution kinds: {kinds:?}")]
MultipleExecutionKinds {
node_id: String,
kinds: Vec<&'static str>,
},
#[error("node `{node_id}` depends on unknown node `{missing}`")]
UnknownDepends { node_id: String, missing: String },
#[error("DAG cycle: {ids:?}")]
Cycle { ids: Vec<String> },
#[error("node `{node_id}` is a bash node — `model:` is only allowed on AI nodes (command/prompt/loop)")]
ModelOnBash { node_id: String },
#[error("node `{node_id}` is a bash node — `allowed_tools:` is only allowed on AI nodes")]
AllowedToolsOnBash { node_id: String },
#[error("node `{node_id}` loop.until is empty — must name a sentinel token")]
EmptyLoopUntil { node_id: String },
#[error("node `{node_id}` loop.until uses reserved sentinel `BLOCKED` — pick any other token")]
ReservedSentinelInLoopUntil { node_id: String },
#[error("node `{node_id}` loop.max_iterations must be > 0")]
ZeroMaxIterations { node_id: String },
#[error("node `{node_id}` loop.max_iterations `{max}` exceeds cap `{cap}` — set a smaller value or split the loop into gated chunks")]
MaxIterationsTooLarge { node_id: String, max: u32, cap: u32 },
#[error("node `{node_id}` loop has no body (need exactly one of command/prompt)")]
LoopNoBody { node_id: String },
#[error("node `{node_id}` loop has both command and prompt — pick one")]
LoopMultipleBodies { node_id: String },
#[error("node `{node_id}` references unknown command `{name}` — not found at dist/skills/{name}.md or core/skills/{name}.md")]
UnknownCommand { node_id: String, name: String },
#[error("node `{node_id}` references gate `{gate}` but the platform-current hook script is missing: {expected_path}")]
GateMissing {
node_id: String,
gate: String,
expected_path: PathBuf,
},
#[error("node `{node_id}` has invalid gate name `{gate}`: {reason}")]
InvalidGateName {
node_id: String,
gate: String,
reason: &'static str,
},
#[error("pipe input `{name}` is missing its `type:` field")]
InputMissingType { name: String },
#[error("pipe input `{name}` contains invalid characters — keys must match `[A-Za-z0-9_]+` so they can be exported as `OMNE_INPUT_<KEY>` env vars")]
InvalidInputKey { name: String },
}
pub fn parse_str(md: &str) -> Result<Pipe, ParseError> {
let yaml =
manifest::extract_frontmatter_block(md).map_err(|_| ParseError::MissingFrontmatter)?;
let pipe: Pipe = serde_yml::from_str(&yaml)?;
Ok(pipe)
}
pub fn load_from_path(path: &Path) -> Result<Pipe, ParseError> {
let md = std::fs::read_to_string(path).map_err(|source| ParseError::Io {
path: path.to_path_buf(),
source,
})?;
parse_str(&md)
}
pub fn load(md_path: &Path, volume_root: &Path) -> Result<Pipe, LoadError> {
let pipe = load_from_path(md_path).map_err(LoadError::Parse)?;
let mut errors = Vec::new();
if let Err(mut e) = validate_structure(&pipe) {
errors.append(&mut e);
}
if let Err(mut e) = validate_with_volume(&pipe, volume_root) {
errors.append(&mut e);
}
if errors.is_empty() {
Ok(pipe)
} else {
Err(LoadError::Invalid(errors))
}
}
#[derive(Debug, Error)]
pub enum LoadError {
#[error(transparent)]
Parse(ParseError),
#[error("pipe failed validation with {} issue(s)", .0.len())]
Invalid(Vec<ValidationError>),
}
pub fn validate_structure(pipe: &Pipe) -> Result<(), Vec<ValidationError>> {
let mut errors = Vec::new();
if pipe.nodes.is_empty() {
errors.push(ValidationError::EmptyNodes {
pipe: pipe.name.clone(),
});
return Err(errors);
}
for (name, spec) in &pipe.inputs {
if spec.kind.is_empty() {
errors.push(ValidationError::InputMissingType { name: name.clone() });
}
if !is_valid_input_key(name) {
errors.push(ValidationError::InvalidInputKey { name: name.clone() });
}
}
let mut seen: BTreeSet<&str> = BTreeSet::new();
for node in &pipe.nodes {
if !seen.insert(node.id.as_str()) {
errors.push(ValidationError::DuplicateNodeId {
pipe: pipe.name.clone(),
id: node.id.clone(),
});
}
}
for node in &pipe.nodes {
check_one_of(node, &mut errors);
check_scope_rules(node, &mut errors);
check_loop_body(node, &mut errors);
}
check_depends_on(pipe, &mut errors);
if let Err(cycle) = detect_cycle(pipe) {
errors.push(cycle);
}
if errors.is_empty() {
Ok(())
} else {
Err(errors)
}
}
fn is_valid_input_key(key: &str) -> bool {
!key.is_empty() && key.chars().all(|c| c.is_ascii_alphanumeric() || c == '_')
}
fn check_one_of(node: &Node, errors: &mut Vec<ValidationError>) {
let kinds = node.populated_kinds();
match kinds.len() {
0 => errors.push(ValidationError::NoExecutionKind {
node_id: node.id.clone(),
}),
1 => {}
_ => errors.push(ValidationError::MultipleExecutionKinds {
node_id: node.id.clone(),
kinds: kinds.into_iter().map(ExecutionKind::as_str).collect(),
}),
}
}
fn check_scope_rules(node: &Node, errors: &mut Vec<ValidationError>) {
if node.execution_kind() != Some(ExecutionKind::Bash) {
return;
}
if node.model.is_some() {
errors.push(ValidationError::ModelOnBash {
node_id: node.id.clone(),
});
}
if !node.allowed_tools.is_empty() {
errors.push(ValidationError::AllowedToolsOnBash {
node_id: node.id.clone(),
});
}
}
fn check_loop_body(node: &Node, errors: &mut Vec<ValidationError>) {
let Some(body) = &node.loop_ else {
return;
};
let bodies = [body.command.is_some(), body.prompt.is_some()]
.iter()
.filter(|p| **p)
.count();
match bodies {
0 => errors.push(ValidationError::LoopNoBody {
node_id: node.id.clone(),
}),
1 => {}
_ => errors.push(ValidationError::LoopMultipleBodies {
node_id: node.id.clone(),
}),
}
if body.until.trim().is_empty() {
errors.push(ValidationError::EmptyLoopUntil {
node_id: node.id.clone(),
});
} else if body.until == RESERVED_BLOCKED {
errors.push(ValidationError::ReservedSentinelInLoopUntil {
node_id: node.id.clone(),
});
}
if body.max_iterations == 0 {
errors.push(ValidationError::ZeroMaxIterations {
node_id: node.id.clone(),
});
} else if body.max_iterations > MAX_ITERATIONS_CAP {
errors.push(ValidationError::MaxIterationsTooLarge {
node_id: node.id.clone(),
max: body.max_iterations,
cap: MAX_ITERATIONS_CAP,
});
}
}
fn check_depends_on(pipe: &Pipe, errors: &mut Vec<ValidationError>) {
let known: BTreeSet<&str> = pipe.nodes.iter().map(|n| n.id.as_str()).collect();
for node in &pipe.nodes {
for dep in &node.depends_on {
if !known.contains(dep.as_str()) {
errors.push(ValidationError::UnknownDepends {
node_id: node.id.clone(),
missing: dep.clone(),
});
}
}
}
}
fn detect_cycle(pipe: &Pipe) -> Result<(), ValidationError> {
#[derive(Clone, Copy, PartialEq)]
enum Color {
White,
Gray,
Black,
}
let edges: HashMap<&str, Vec<&str>> = pipe
.nodes
.iter()
.map(|n| {
(
n.id.as_str(),
n.depends_on.iter().map(String::as_str).collect(),
)
})
.collect();
let mut color: HashMap<&str, Color> = pipe
.nodes
.iter()
.map(|n| (n.id.as_str(), Color::White))
.collect();
let mut stack: Vec<&str> = Vec::new();
fn visit<'a>(
id: &'a str,
edges: &HashMap<&'a str, Vec<&'a str>>,
color: &mut HashMap<&'a str, Color>,
stack: &mut Vec<&'a str>,
) -> Option<Vec<String>> {
match color.get(id).copied().unwrap_or(Color::White) {
Color::Black => return None,
Color::Gray => {
let mut cycle: Vec<String> = stack.iter().map(|s| s.to_string()).collect();
cycle.push(id.to_string());
return Some(cycle);
}
Color::White => {}
}
color.insert(id, Color::Gray);
stack.push(id);
if let Some(deps) = edges.get(id) {
for dep in deps {
if let Some(c) = visit(dep, edges, color, stack) {
return Some(c);
}
}
}
stack.pop();
color.insert(id, Color::Black);
None
}
for node in &pipe.nodes {
if color.get(node.id.as_str()).copied() == Some(Color::White) {
if let Some(cycle) = visit(node.id.as_str(), &edges, &mut color, &mut stack) {
return Err(ValidationError::Cycle { ids: cycle });
}
}
}
Ok(())
}
pub fn validate_with_volume(pipe: &Pipe, volume_root: &Path) -> Result<(), Vec<ValidationError>> {
let mut errors = Vec::new();
let dist_skills = volume::dist_dir(volume_root).join("skills");
let core_skills = volume::core_dir(volume_root).join("skills");
let dist_hooks = volume::dist_dir(volume_root).join("hooks");
for node in &pipe.nodes {
if let Some(name) = &node.command {
check_command_exists(node, name, &dist_skills, &core_skills, &mut errors);
}
if let Some(body) = &node.loop_ {
if let Some(name) = &body.command {
check_command_exists(node, name, &dist_skills, &core_skills, &mut errors);
}
}
if let Some(gate) = &node.gate {
if let Some(reason) = bad_gate_name_reason(gate) {
errors.push(ValidationError::InvalidGateName {
node_id: node.id.clone(),
gate: gate.clone(),
reason,
});
continue;
}
let expected = dist_hooks.join(format!("{gate}.{}", platform_hook_extension()));
if !expected.is_file() {
errors.push(ValidationError::GateMissing {
node_id: node.id.clone(),
gate: gate.clone(),
expected_path: expected,
});
}
}
}
if errors.is_empty() {
Ok(())
} else {
Err(errors)
}
}
fn check_command_exists(
node: &Node,
name: &str,
dist_skills: &Path,
core_skills: &Path,
errors: &mut Vec<ValidationError>,
) {
let dist_match = dist_skills.join(format!("{name}.md"));
let core_match = core_skills.join(format!("{name}.md"));
if !dist_match.is_file() && !core_match.is_file() {
errors.push(ValidationError::UnknownCommand {
node_id: node.id.clone(),
name: name.to_string(),
});
}
}
fn bad_gate_name_reason(gate: &str) -> Option<&'static str> {
if gate.is_empty() {
return Some("empty gate name");
}
if gate.contains('/') || gate.contains('\\') {
return Some("gate name must not contain path separators");
}
if gate == "." || gate == ".." || gate.split('.').any(|c| c.is_empty()) {
return Some("gate name must not contain `..` or leading/trailing dots");
}
if gate.contains('\0') {
return Some("gate name must not contain NUL bytes");
}
None
}
#[cfg(windows)]
fn platform_hook_extension() -> &'static str {
"ps1"
}
#[cfg(not(windows))]
fn platform_hook_extension() -> &'static str {
"sh"
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ValidationWarning {
UnquotedOmneInputRef {
node_id: String,
reference: String,
},
}
impl std::fmt::Display for ValidationWarning {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ValidationWarning::UnquotedOmneInputRef { node_id, reference } => write!(
f,
"node `{node_id}` bash body references {reference} without surrounding double quotes — an input value containing shell metacharacters will be word-split. Wrap the reference as \"{reference}\" to mitigate injection.",
),
}
}
}
pub fn collect_warnings(pipe: &Pipe) -> Vec<ValidationWarning> {
let mut out = Vec::new();
for node in &pipe.nodes {
if let Some(body) = &node.bash {
for reference in find_unquoted_omne_input_refs(body) {
out.push(ValidationWarning::UnquotedOmneInputRef {
node_id: node.id.clone(),
reference,
});
}
}
}
out
}
fn find_unquoted_omne_input_refs(body: &str) -> Vec<String> {
const UNIX_PREFIX: &str = "$OMNE_INPUT_";
const WIN_PREFIX: &str = "%OMNE_INPUT_";
let bytes = body.as_bytes();
let mut out = Vec::new();
for (sigil, end_char) in [(UNIX_PREFIX, None), (WIN_PREFIX, Some(b'%'))] {
let mut from = 0;
while let Some(rel) = body[from..].find(sigil) {
let start = from + rel;
let preceded_by_quote = start > 0 && bytes[start - 1] == b'"';
let name_start = start + sigil.len();
let mut end = name_start;
while end < bytes.len() {
let c = bytes[end];
if c.is_ascii_uppercase() || c.is_ascii_digit() || c == b'_' {
end += 1;
} else {
break;
}
}
let mut match_end = end;
if let Some(close) = end_char {
if end < bytes.len() && bytes[end] == close {
match_end = end + 1;
} else {
from = match_end.max(start + 1);
continue;
}
}
if !preceded_by_quote && end > name_start {
out.push(body[start..match_end].to_string());
}
from = match_end.max(start + 1);
}
}
out
}
#[cfg(test)]
mod tests {
use super::*;
fn yaml_to_pipe(yaml: &str) -> Result<Pipe, ParseError> {
let md = format!("---\n{yaml}---\n\n# pipe body\n");
parse_str(&md)
}
#[test]
fn parses_minimal_pipe() {
let yaml = r#"name: feature
version: 1
nodes:
- id: only
bash: echo hi
"#;
let pipe = yaml_to_pipe(yaml).unwrap();
assert_eq!(pipe.name, "feature");
assert_eq!(pipe.nodes.len(), 1);
assert_eq!(pipe.nodes[0].execution_kind(), Some(ExecutionKind::Bash));
}
#[test]
fn structural_validator_accepts_minimal_bash_pipe() {
let pipe = yaml_to_pipe(
r#"name: feature
version: 1
nodes:
- id: only
bash: echo hi
"#,
)
.unwrap();
validate_structure(&pipe).unwrap();
}
#[test]
fn input_key_with_equals_sign_rejected() {
let pipe = yaml_to_pipe(
r#"name: bad_input_key
version: 1
inputs:
"foo=bar":
type: string
nodes:
- id: only
bash: echo hi
"#,
)
.unwrap();
let errs = validate_structure(&pipe).unwrap_err();
assert!(
errs.iter().any(
|e| matches!(e, ValidationError::InvalidInputKey { name } if name == "foo=bar")
),
"expected InvalidInputKey for `foo=bar`, got {errs:?}"
);
}
#[test]
fn input_key_with_space_rejected() {
let pipe = yaml_to_pipe(
r#"name: spacey_key
version: 1
inputs:
"has space":
type: string
nodes:
- id: only
bash: echo hi
"#,
)
.unwrap();
let errs = validate_structure(&pipe).unwrap_err();
assert!(
errs.iter()
.any(|e| matches!(e, ValidationError::InvalidInputKey { .. })),
"spaces rejected"
);
}
#[test]
fn input_key_allows_underscores_and_digits() {
let pipe = yaml_to_pipe(
r#"name: ok_keys
version: 1
inputs:
task_1:
type: string
OMNE_VERSION:
type: string
nodes:
- id: only
bash: echo hi
"#,
)
.unwrap();
validate_structure(&pipe).expect("alphanumeric + underscore keys valid");
}
#[test]
fn cycle_detected_in_two_node_loop() {
let pipe = yaml_to_pipe(
r#"name: cyclic
version: 1
nodes:
- id: a
bash: echo a
depends_on: [b]
- id: b
bash: echo b
depends_on: [a]
"#,
)
.unwrap();
let errs = validate_structure(&pipe).unwrap_err();
assert!(errs
.iter()
.any(|e| matches!(e, ValidationError::Cycle { .. })));
}
#[test]
fn max_iterations_above_cap_rejected() {
let pipe = yaml_to_pipe(
r#"name: unbounded_loop
version: 1
nodes:
- id: l1
loop:
prompt: keep going
until: DONE
max_iterations: 999999999
"#,
)
.unwrap();
let errs = validate_structure(&pipe).unwrap_err();
assert!(
errs.iter().any(|e| matches!(
e,
ValidationError::MaxIterationsTooLarge { cap, .. } if *cap == MAX_ITERATIONS_CAP
)),
"expected MaxIterationsTooLarge, got {errs:?}"
);
}
#[test]
fn max_iterations_at_cap_allowed() {
let yaml = format!(
"name: at_cap
version: 1
nodes:
- id: l1
loop:
prompt: keep going
until: DONE
max_iterations: {MAX_ITERATIONS_CAP}
"
);
let pipe = yaml_to_pipe(&yaml).unwrap();
validate_structure(&pipe).expect("value exactly at cap is allowed");
}
#[test]
fn unquoted_dollar_omne_input_emits_warning() {
let pipe = yaml_to_pipe(
r#"name: unquoted
version: 1
inputs:
task:
type: string
nodes:
- id: only
bash: "echo value=$OMNE_INPUT_TASK"
"#,
)
.unwrap();
let warnings = collect_warnings(&pipe);
assert_eq!(warnings.len(), 1, "one warning expected: {warnings:?}");
match &warnings[0] {
ValidationWarning::UnquotedOmneInputRef { node_id, reference } => {
assert_eq!(node_id, "only");
assert_eq!(reference, "$OMNE_INPUT_TASK");
}
}
}
#[test]
fn quoted_dollar_omne_input_no_warning() {
let pipe = yaml_to_pipe(
r#"name: quoted
version: 1
inputs:
task:
type: string
nodes:
- id: only
bash: "echo \"$OMNE_INPUT_TASK\""
"#,
)
.unwrap();
assert!(collect_warnings(&pipe).is_empty());
}
#[test]
fn percent_omne_input_unquoted_emits_warning() {
let pipe = yaml_to_pipe(
r#"name: win_unquoted
version: 1
inputs:
task:
type: string
nodes:
- id: only
bash: "echo %OMNE_INPUT_TASK%"
"#,
)
.unwrap();
let warnings = collect_warnings(&pipe);
assert_eq!(warnings.len(), 1);
match &warnings[0] {
ValidationWarning::UnquotedOmneInputRef { reference, .. } => {
assert_eq!(reference, "%OMNE_INPUT_TASK%");
}
}
}
#[test]
fn bad_gate_name_reason_rejects_traversal_and_separators() {
assert!(bad_gate_name_reason("").is_some());
assert!(bad_gate_name_reason("../evil").is_some());
assert!(bad_gate_name_reason("..\\evil").is_some());
assert!(bad_gate_name_reason("foo/bar").is_some());
assert!(bad_gate_name_reason("foo\\bar").is_some());
assert!(bad_gate_name_reason("..").is_some());
assert!(bad_gate_name_reason(".").is_some());
assert!(bad_gate_name_reason(".hidden").is_some());
assert!(bad_gate_name_reason("trailing.").is_some());
assert!(bad_gate_name_reason("with\0nul").is_some());
assert!(bad_gate_name_reason("pre_commit").is_none());
assert!(bad_gate_name_reason("research_ok").is_none());
assert!(bad_gate_name_reason("foo.bar").is_none());
}
}