1#![allow(dead_code)]
26
27use std::collections::BTreeMap;
28use std::io::{self, Write};
29use std::path::Path;
30
31use clap::Args as ClapArgs;
32use ulid::Ulid;
33
34use crate::claude_proc;
35use crate::clock;
36use crate::dag::{NodeOutcome as DagOutcome, NodeState, Scheduler};
37use crate::error::CliError;
38use crate::event_log::EventLog;
39use crate::events::{Event, Input, PipeAborted, PipeCompleted, PipeStarted};
40use crate::executor::{self, ExecutorContext, NodeOutcome};
41use crate::manifest;
42use crate::pipe::{self, Pipe};
43use crate::ulid as ulid_alloc;
44use crate::volume;
45use crate::worktree::{self, RemoveMode};
46
47#[derive(Debug, ClapArgs)]
49pub struct Args {
50 pub pipe: String,
52
53 #[arg(long = "input", value_name = "KEY=VALUE")]
56 pub input: Vec<String>,
57}
58
59pub fn run(args: &Args) -> Result<(), CliError> {
60 let cwd = std::env::current_dir()
61 .map_err(|e| CliError::Io(format!("cannot determine current directory: {e}")))?;
62 run_at_root(&cwd, args, &mut io::stdout(), None)
63}
64
65pub fn run_at_root(
68 start: &Path,
69 args: &Args,
70 stdout: &mut dyn Write,
71 claude_bin: Option<&Path>,
72) -> Result<(), CliError> {
73 let root = strip_unc_prefix(volume::find_omne_root(start).ok_or(CliError::NotAVolume)?);
79
80 worktree::preflight_volume_path_length(&root)?;
82
83 let pipe_path = volume::dist_dir(&root)
85 .join("pipes")
86 .join(format!("{}.md", args.pipe));
87 let pipe_def = pipe::load(&pipe_path, &root).map_err(|e| map_pipe_load_error(e, &pipe_path))?;
88
89 if pipe_def.needs_claude() {
91 claude_proc::preflight(claude_bin)?;
92 }
93
94 let inputs = resolve_inputs(&pipe_def, &args.input)?;
96
97 let ulid_value = ulid_alloc::allocate(&volume::ulid_lock_path(&root))?;
99 let run_id = format!(
100 "{}-{}",
101 pipe_def.name,
102 ulid_value.to_string().to_ascii_lowercase()
103 );
104
105 writeln!(stdout, "run_id={run_id}")
110 .map_err(|e| CliError::Io(format!("stdout write failed: {e}")))?;
111 stdout
112 .flush()
113 .map_err(|e| CliError::Io(format!("stdout flush failed: {e}")))?;
114
115 let wt_path = worktree::create(&root, &run_id)?;
119 let log = match EventLog::for_run(&root, &run_id) {
120 Ok(l) => l,
121 Err(e) => {
122 let _ = worktree::remove(&root, &run_id, RemoveMode::Force);
123 return Err(e.into());
124 }
125 };
126
127 let distro_version = read_distro_version(&root);
133 log.append(&Event::PipeStarted(PipeStarted {
134 id: new_event_id(),
135 ts: iso_utc_now(),
136 run_id: run_id.clone(),
137 pipe: pipe_def.name.clone(),
138 inputs: inputs.clone(),
139 distro_version,
140 }))?;
141
142 let mut ctx = ExecutorContext::new(&root, &run_id, &wt_path, &log, &inputs);
146 ctx.default_model = pipe_def.default_model.as_deref();
147 ctx.claude_bin = claude_bin;
148
149 let terminal = run_scheduler(&pipe_def, &ctx)?;
151
152 match terminal {
154 TerminalOutcome::Completed => {
155 log.append(&Event::PipeCompleted(PipeCompleted {
156 id: new_event_id(),
157 ts: iso_utc_now(),
158 run_id: run_id.clone(),
159 }))?;
160 Ok(())
161 }
162 TerminalOutcome::Aborted { reason } => {
163 log.append(&Event::PipeAborted(PipeAborted {
164 id: new_event_id(),
165 ts: iso_utc_now(),
166 run_id: run_id.clone(),
167 reason: reason.clone(),
168 }))?;
169 Err(CliError::PipeAborted { run_id, reason })
170 }
171 }
172}
173
174enum TerminalOutcome {
178 Completed,
179 Aborted { reason: String },
180}
181
182fn run_scheduler<'a>(
190 pipe_def: &'a Pipe,
191 ctx: &ExecutorContext<'a>,
192) -> Result<TerminalOutcome, CliError> {
193 let mut scheduler = Scheduler::new(pipe_def);
194 let nodes_by_id: BTreeMap<&str, &pipe::Node> =
195 pipe_def.nodes.iter().map(|n| (n.id.as_str(), n)).collect();
196
197 while !scheduler.is_terminal() {
198 let ready = scheduler.ready();
199 if ready.is_empty() {
200 for node in &pipe_def.nodes {
204 if scheduler.state(&node.id) == Some(NodeState::Pending) {
205 let _ = scheduler.mark(&node.id, DagOutcome::Failed);
206 }
207 }
208 break;
209 }
210 for node_id in ready {
211 let node = nodes_by_id
212 .get(node_id.as_str())
213 .expect("scheduler returned id not present in pipe");
214 let outcome = executor::dispatch(node, ctx)?;
215 let dag_outcome = match outcome {
216 NodeOutcome::Completed => DagOutcome::Completed,
217 NodeOutcome::Failed { .. } => DagOutcome::Failed,
218 };
219 let _ = scheduler
222 .mark(&node_id, dag_outcome)
223 .map_err(|e| CliError::Io(format!("scheduler mark error: {e}")))?;
224 }
225 }
226
227 let mut failed: Vec<&str> = Vec::new();
229 let mut blocked: Vec<&str> = Vec::new();
230 for id in pipe_def.nodes.iter().map(|n| n.id.as_str()) {
231 match scheduler.state(id) {
232 Some(NodeState::Failed) => failed.push(id),
233 Some(NodeState::Blocked) => blocked.push(id),
234 _ => {}
235 }
236 }
237 if failed.is_empty() && blocked.is_empty() {
238 return Ok(TerminalOutcome::Completed);
239 }
240 let mut reason = String::new();
241 if !failed.is_empty() {
242 reason.push_str(&format!("failed nodes: {}", failed.join(", ")));
243 }
244 if !blocked.is_empty() {
245 if !reason.is_empty() {
246 reason.push_str("; ");
247 }
248 reason.push_str(&format!("blocked nodes: {}", blocked.join(", ")));
249 }
250 Ok(TerminalOutcome::Aborted { reason })
251}
252
253fn resolve_inputs(pipe_def: &Pipe, raw: &[String]) -> Result<Vec<Input>, CliError> {
263 let mut provided: BTreeMap<String, String> = BTreeMap::new();
264 let mut issues: Vec<String> = Vec::new();
265
266 for entry in raw {
267 let Some((key, value)) = entry.split_once('=') else {
268 issues.push(format!(
269 "--input {entry:?} must be `key=value` (no `=` found)"
270 ));
271 continue;
272 };
273 let key = key.to_string();
274 if !pipe_def.inputs.contains_key(&key) {
275 issues.push(format!(
276 "--input {key:?} is not declared in pipe `{}` inputs:",
277 pipe_def.name
278 ));
279 continue;
280 }
281 if provided.insert(key.clone(), value.to_string()).is_some() {
282 issues.push(format!("--input {key:?} provided more than once"));
283 }
284 }
285
286 let mut resolved: Vec<Input> = Vec::new();
287 for (name, spec) in &pipe_def.inputs {
288 match provided.get(name) {
289 Some(value) => resolved.push(Input {
290 key: name.clone(),
291 value: value.clone(),
292 }),
293 None => {
294 if let Some(default) = &spec.default {
295 resolved.push(Input {
296 key: name.clone(),
297 value: default.clone(),
298 });
299 } else if spec.required {
300 issues.push(format!(
301 "--input {name:?} is required by pipe `{}` but was not provided",
302 pipe_def.name
303 ));
304 }
305 }
306 }
307 }
308
309 if issues.is_empty() {
310 Ok(resolved)
311 } else {
312 Err(CliError::ValidationFailed { issues })
313 }
314}
315
316fn map_pipe_load_error(e: pipe::LoadError, pipe_path: &Path) -> CliError {
317 match e {
318 pipe::LoadError::Parse(perr) => CliError::ValidationFailed {
319 issues: vec![format!("pipe {}: {perr}", pipe_path.display())],
320 },
321 pipe::LoadError::Invalid(errs) => CliError::ValidationFailed {
322 issues: errs
323 .into_iter()
324 .map(|er| format!("pipe {}: {er}", pipe_path.display()))
325 .collect(),
326 },
327 }
328}
329
330fn read_distro_version(root: &Path) -> String {
335 let readme = volume::omne_dir(root).join("omne.md");
336 let Ok(content) = std::fs::read_to_string(&readme) else {
337 return String::new();
338 };
339 match manifest::parse_frontmatter(&content) {
340 Ok(fm) => fm.distro_version,
341 Err(_) => String::new(),
342 }
343}
344
345fn new_event_id() -> String {
346 Ulid::new().to_string().to_ascii_lowercase()
347}
348
349fn iso_utc_now() -> String {
350 clock::now_utc().format_iso_utc()
351}
352
353#[cfg(windows)]
358fn strip_unc_prefix(p: std::path::PathBuf) -> std::path::PathBuf {
359 let s = p.to_string_lossy();
360 if let Some(rest) = s.strip_prefix(r"\\?\") {
361 if !rest.starts_with("UNC\\") {
362 return std::path::PathBuf::from(rest.to_string());
363 }
364 }
365 p
366}
367
368#[cfg(not(windows))]
369fn strip_unc_prefix(p: std::path::PathBuf) -> std::path::PathBuf {
370 p
371}