#![allow(dead_code)]
use std::collections::BTreeMap;
use std::io::{self, Write};
use std::path::Path;
use clap::Args as ClapArgs;
use ulid::Ulid;
use crate::claude_proc;
use crate::clock;
use crate::dag::{NodeOutcome as DagOutcome, NodeState, Scheduler};
use crate::error::CliError;
use crate::event_log::EventLog;
use crate::events::{Event, Input, PipeAborted, PipeCompleted, PipeStarted};
use crate::executor::{self, ExecutorContext, NodeOutcome};
use crate::manifest;
use crate::pipe::{self, Pipe};
use crate::ulid as ulid_alloc;
use crate::volume;
use crate::worktree::{self, RemoveMode};
#[derive(Debug, ClapArgs)]
pub struct Args {
pub pipe: String,
#[arg(long = "input", value_name = "KEY=VALUE")]
pub input: Vec<String>,
}
pub fn run(args: &Args) -> Result<(), CliError> {
let cwd = std::env::current_dir()
.map_err(|e| CliError::Io(format!("cannot determine current directory: {e}")))?;
run_at_root(&cwd, args, &mut io::stdout(), None)
}
pub fn run_at_root(
start: &Path,
args: &Args,
stdout: &mut dyn Write,
claude_bin: Option<&Path>,
) -> Result<(), CliError> {
let root = strip_unc_prefix(volume::find_omne_root(start).ok_or(CliError::NotAVolume)?);
worktree::preflight_volume_path_length(&root)?;
let pipe_path = volume::dist_dir(&root)
.join("pipes")
.join(format!("{}.md", args.pipe));
let pipe_def = pipe::load(&pipe_path, &root).map_err(|e| map_pipe_load_error(e, &pipe_path))?;
if pipe_def.needs_claude() {
claude_proc::preflight(claude_bin)?;
}
let inputs = resolve_inputs(&pipe_def, &args.input)?;
let ulid_value = ulid_alloc::allocate(&volume::ulid_lock_path(&root))?;
let run_id = format!(
"{}-{}",
pipe_def.name,
ulid_value.to_string().to_ascii_lowercase()
);
writeln!(stdout, "run_id={run_id}")
.map_err(|e| CliError::Io(format!("stdout write failed: {e}")))?;
stdout
.flush()
.map_err(|e| CliError::Io(format!("stdout flush failed: {e}")))?;
let wt_path = worktree::create(&root, &run_id)?;
let log = match EventLog::for_run(&root, &run_id) {
Ok(l) => l,
Err(e) => {
let _ = worktree::remove(&root, &run_id, RemoveMode::Force);
return Err(e.into());
}
};
let distro_version = read_distro_version(&root);
log.append(&Event::PipeStarted(PipeStarted {
id: new_event_id(),
ts: iso_utc_now(),
run_id: run_id.clone(),
pipe: pipe_def.name.clone(),
inputs: inputs.clone(),
distro_version,
}))?;
let mut ctx = ExecutorContext::new(&root, &run_id, &wt_path, &log, &inputs);
ctx.default_model = pipe_def.default_model.as_deref();
ctx.claude_bin = claude_bin;
let terminal = run_scheduler(&pipe_def, &ctx)?;
match terminal {
TerminalOutcome::Completed => {
log.append(&Event::PipeCompleted(PipeCompleted {
id: new_event_id(),
ts: iso_utc_now(),
run_id: run_id.clone(),
}))?;
Ok(())
}
TerminalOutcome::Aborted { reason } => {
log.append(&Event::PipeAborted(PipeAborted {
id: new_event_id(),
ts: iso_utc_now(),
run_id: run_id.clone(),
reason: reason.clone(),
}))?;
Err(CliError::PipeAborted { run_id, reason })
}
}
}
enum TerminalOutcome {
Completed,
Aborted { reason: String },
}
fn run_scheduler<'a>(
pipe_def: &'a Pipe,
ctx: &ExecutorContext<'a>,
) -> Result<TerminalOutcome, CliError> {
let mut scheduler = Scheduler::new(pipe_def);
let nodes_by_id: BTreeMap<&str, &pipe::Node> =
pipe_def.nodes.iter().map(|n| (n.id.as_str(), n)).collect();
while !scheduler.is_terminal() {
let ready = scheduler.ready();
if ready.is_empty() {
for node in &pipe_def.nodes {
if scheduler.state(&node.id) == Some(NodeState::Pending) {
let _ = scheduler.mark(&node.id, DagOutcome::Failed);
}
}
break;
}
for node_id in ready {
let node = nodes_by_id
.get(node_id.as_str())
.expect("scheduler returned id not present in pipe");
let outcome = executor::dispatch(node, ctx)?;
let dag_outcome = match outcome {
NodeOutcome::Completed => DagOutcome::Completed,
NodeOutcome::Failed { .. } => DagOutcome::Failed,
};
let _ = scheduler
.mark(&node_id, dag_outcome)
.map_err(|e| CliError::Io(format!("scheduler mark error: {e}")))?;
}
}
let mut failed: Vec<&str> = Vec::new();
let mut blocked: Vec<&str> = Vec::new();
for id in pipe_def.nodes.iter().map(|n| n.id.as_str()) {
match scheduler.state(id) {
Some(NodeState::Failed) => failed.push(id),
Some(NodeState::Blocked) => blocked.push(id),
_ => {}
}
}
if failed.is_empty() && blocked.is_empty() {
return Ok(TerminalOutcome::Completed);
}
let mut reason = String::new();
if !failed.is_empty() {
reason.push_str(&format!("failed nodes: {}", failed.join(", ")));
}
if !blocked.is_empty() {
if !reason.is_empty() {
reason.push_str("; ");
}
reason.push_str(&format!("blocked nodes: {}", blocked.join(", ")));
}
Ok(TerminalOutcome::Aborted { reason })
}
fn resolve_inputs(pipe_def: &Pipe, raw: &[String]) -> Result<Vec<Input>, CliError> {
let mut provided: BTreeMap<String, String> = BTreeMap::new();
let mut issues: Vec<String> = Vec::new();
for entry in raw {
let Some((key, value)) = entry.split_once('=') else {
issues.push(format!(
"--input {entry:?} must be `key=value` (no `=` found)"
));
continue;
};
let key = key.to_string();
if !pipe_def.inputs.contains_key(&key) {
issues.push(format!(
"--input {key:?} is not declared in pipe `{}` inputs:",
pipe_def.name
));
continue;
}
if provided.insert(key.clone(), value.to_string()).is_some() {
issues.push(format!("--input {key:?} provided more than once"));
}
}
let mut resolved: Vec<Input> = Vec::new();
for (name, spec) in &pipe_def.inputs {
match provided.get(name) {
Some(value) => resolved.push(Input {
key: name.clone(),
value: value.clone(),
}),
None => {
if let Some(default) = &spec.default {
resolved.push(Input {
key: name.clone(),
value: default.clone(),
});
} else if spec.required {
issues.push(format!(
"--input {name:?} is required by pipe `{}` but was not provided",
pipe_def.name
));
}
}
}
}
if issues.is_empty() {
Ok(resolved)
} else {
Err(CliError::ValidationFailed { issues })
}
}
fn map_pipe_load_error(e: pipe::LoadError, pipe_path: &Path) -> CliError {
match e {
pipe::LoadError::Parse(perr) => CliError::ValidationFailed {
issues: vec![format!("pipe {}: {perr}", pipe_path.display())],
},
pipe::LoadError::Invalid(errs) => CliError::ValidationFailed {
issues: errs
.into_iter()
.map(|er| format!("pipe {}: {er}", pipe_path.display()))
.collect(),
},
}
}
fn read_distro_version(root: &Path) -> String {
let readme = volume::omne_dir(root).join("omne.md");
let Ok(content) = std::fs::read_to_string(&readme) else {
return String::new();
};
match manifest::parse_frontmatter(&content) {
Ok(fm) => fm.distro_version,
Err(_) => String::new(),
}
}
fn new_event_id() -> String {
Ulid::new().to_string().to_ascii_lowercase()
}
fn iso_utc_now() -> String {
clock::now_utc().format_iso_utc()
}
#[cfg(windows)]
fn strip_unc_prefix(p: std::path::PathBuf) -> std::path::PathBuf {
let s = p.to_string_lossy();
if let Some(rest) = s.strip_prefix(r"\\?\") {
if !rest.starts_with("UNC\\") {
return std::path::PathBuf::from(rest.to_string());
}
}
p
}
#[cfg(not(windows))]
fn strip_unc_prefix(p: std::path::PathBuf) -> std::path::PathBuf {
p
}