Skip to main content

omne_cli/commands/
run.rs

1//! `omne run <pipe> [--input k=v ...]` — end-to-end pipe runner.
2//!
3//! Stitches Units 4–11 together: pipe load + input validation, ULID
4//! allocation, worktree creation, per-run event log, DAG scheduling,
5//! node dispatch, terminal `pipe.completed` / `pipe.aborted`.
6//!
7//! Transactional state lifecycle (plan HLD):
8//! 1. Path-length + `claude --version` preflight — errors leave no state.
9//! 2. Load + validate pipe against volume.
10//! 3. Parse `--input` against pipe `inputs:` schema — errors leave no state.
11//! 4. Allocate ULID → print `run_id=<value>` as first synchronous stdout
12//!    line.
13//! 5. Create detached-HEAD worktree.
14//! 6. Open `events.jsonl`. On failure, teardown the worktree so every
15//!    live worktree has a live run dir.
16//! 7. Emit `pipe.started` + dispatch loop + emit `pipe.completed` or
17//!    `pipe.aborted`.
18//!
19//! Worktree is **not** auto-removed on successful runs in v1 — users
20//! reclaim via `git worktree remove .omne/wt/<run_id>`. See plan "Deferred
21//! to Separate Tasks: `omne cleanup`".
22
23// Test-seam function `run_at_root` is called from the integration test
24// suite via lib.rs but not from main.rs; suppress the bin-crate warning.
25#![allow(dead_code)]
26
27use std::collections::BTreeMap;
28use std::io::{self, Write};
29use std::path::Path;
30
31use clap::Args as ClapArgs;
32use ulid::Ulid;
33
34use crate::claude_proc;
35use crate::clock;
36use crate::dag::{NodeOutcome as DagOutcome, NodeState, Scheduler};
37use crate::error::CliError;
38use crate::event_log::EventLog;
39use crate::events::{Event, Input, PipeAborted, PipeCompleted, PipeStarted};
40use crate::executor::{self, ExecutorContext, NodeOutcome};
41use crate::manifest;
42use crate::pipe::{self, Pipe};
43use crate::ulid as ulid_alloc;
44use crate::volume;
45use crate::worktree::{self, RemoveMode};
46
47/// Arguments for `omne run`.
48#[derive(Debug, ClapArgs)]
49pub struct Args {
50    /// Pipe name — resolved to `.omne/dist/pipes/<name>.md`.
51    pub pipe: String,
52
53    /// `--input key=value` pair. Repeatable. Validated against the pipe's
54    /// `inputs:` schema during preflight.
55    #[arg(long = "input", value_name = "KEY=VALUE")]
56    pub input: Vec<String>,
57}
58
59pub fn run(args: &Args) -> Result<(), CliError> {
60    let cwd = std::env::current_dir()
61        .map_err(|e| CliError::Io(format!("cannot determine current directory: {e}")))?;
62    run_at_root(&cwd, args, &mut io::stdout(), None)
63}
64
65/// Test seam: run against an arbitrary starting directory with an
66/// injectable stdout sink and optional `claude` binary override.
67pub fn run_at_root(
68    start: &Path,
69    args: &Args,
70    stdout: &mut dyn Write,
71    claude_bin: Option<&Path>,
72) -> Result<(), CliError> {
73    // `find_omne_root` canonicalizes, which on Windows yields a
74    // `\\?\C:\...` extended-length path. `git worktree add` refuses
75    // those with a cryptic "could not create leading directories"
76    // error. Normalize back to a regular absolute path before anything
77    // else touches the root.
78    let root = strip_unc_prefix(volume::find_omne_root(start).ok_or(CliError::NotAVolume)?);
79
80    // Preflight — no state written on failure.
81    worktree::preflight_volume_path_length(&root)?;
82
83    // Pipe load + structural/volume validation.
84    let pipe_path = volume::dist_dir(&root)
85        .join("pipes")
86        .join(format!("{}.md", args.pipe));
87    let pipe_def = pipe::load(&pipe_path, &root).map_err(|e| map_pipe_load_error(e, &pipe_path))?;
88
89    // Claude preflight only if the pipe has AI-bearing nodes.
90    if pipe_def.needs_claude() {
91        claude_proc::preflight(claude_bin)?;
92    }
93
94    // Inputs preflight — errors leave no state.
95    let inputs = resolve_inputs(&pipe_def, &args.input)?;
96
97    // Mint run_id.
98    let ulid_value = ulid_alloc::allocate(&volume::ulid_lock_path(&root))?;
99    let run_id = format!(
100        "{}-{}",
101        pipe_def.name,
102        ulid_value.to_string().to_ascii_lowercase()
103    );
104
105    // `run_id=<value>` MUST be the first synchronous stdout line (plan
106    // R13) so an agent spawning `omne run` as a long-running subprocess
107    // can correlate stdout with `events.jsonl` before anything else
108    // lands.
109    writeln!(stdout, "run_id={run_id}")
110        .map_err(|e| CliError::Io(format!("stdout write failed: {e}")))?;
111    stdout
112        .flush()
113        .map_err(|e| CliError::Io(format!("stdout flush failed: {e}")))?;
114
115    // Worktree first, then event log. If log open fails, tear the
116    // worktree down so the invariant "every live worktree has a live
117    // run dir" holds.
118    let wt_path = worktree::create(&root, &run_id)?;
119    let log = match EventLog::for_run(&root, &run_id) {
120        Ok(l) => l,
121        Err(e) => {
122            let _ = worktree::remove(&root, &run_id, RemoveMode::Force);
123            return Err(e.into());
124        }
125    };
126
127    // Pipe started. `distro_version` best-effort: a fresh-test volume
128    // may skip `omne.md`, and an unreadable/malformed manifest is not
129    // a reason to fail a run — the event field is
130    // `skip_serializing_if = "String::is_empty"` so the empty case
131    // round-trips cleanly.
132    let distro_version = read_distro_version(&root);
133    log.append(&Event::PipeStarted(PipeStarted {
134        id: new_event_id(),
135        ts: iso_utc_now(),
136        run_id: run_id.clone(),
137        pipe: pipe_def.name.clone(),
138        inputs: inputs.clone(),
139        distro_version,
140    }))?;
141
142    // Executor context. Defaults for node / gate timeouts come from
143    // `ExecutorContext::new`; `default_model` + `claude_bin` are the
144    // only per-run overrides the pipe surface gives us.
145    let mut ctx = ExecutorContext::new(&root, &run_id, &wt_path, &log, &inputs);
146    ctx.default_model = pipe_def.default_model.as_deref();
147    ctx.claude_bin = claude_bin;
148
149    // DAG loop.
150    let terminal = run_scheduler(&pipe_def, &ctx)?;
151
152    // Emit terminal event + map to CLI exit semantics.
153    match terminal {
154        TerminalOutcome::Completed => {
155            log.append(&Event::PipeCompleted(PipeCompleted {
156                id: new_event_id(),
157                ts: iso_utc_now(),
158                run_id: run_id.clone(),
159            }))?;
160            Ok(())
161        }
162        TerminalOutcome::Aborted { reason } => {
163            log.append(&Event::PipeAborted(PipeAborted {
164                id: new_event_id(),
165                ts: iso_utc_now(),
166                run_id: run_id.clone(),
167                reason: reason.clone(),
168            }))?;
169            Err(CliError::PipeAborted { run_id, reason })
170        }
171    }
172}
173
174// ── Helpers ────────────────────────────────────────────────────────
175
176/// Pipe-level outcome fed back to the CLI exit mapping.
177enum TerminalOutcome {
178    Completed,
179    Aborted { reason: String },
180}
181
182/// Single-threaded scheduler loop: dispatch each newly-ready node
183/// immediately, feed the outcome back to `Scheduler::mark`, repeat until
184/// terminal. Sequential dispatch is deliberate for v1 — the scheduler
185/// itself supports parallel fanout, but concurrent subprocess
186/// orchestration (spawn/wait coordination, cross-node capture ordering)
187/// is deferred. Fanout-shaped pipes still produce the correct
188/// happens-before relationships via `depends_on` + `trigger_rule`.
189fn run_scheduler<'a>(
190    pipe_def: &'a Pipe,
191    ctx: &ExecutorContext<'a>,
192) -> Result<TerminalOutcome, CliError> {
193    let mut scheduler = Scheduler::new(pipe_def);
194    let nodes_by_id: BTreeMap<&str, &pipe::Node> =
195        pipe_def.nodes.iter().map(|n| (n.id.as_str(), n)).collect();
196
197    while !scheduler.is_terminal() {
198        let ready = scheduler.ready();
199        if ready.is_empty() {
200            // No ready nodes but not terminal: deadlocked. Cascade-block
201            // all remaining Pending nodes so the abort reasoning includes
202            // them.
203            for node in &pipe_def.nodes {
204                if scheduler.state(&node.id) == Some(NodeState::Pending) {
205                    let _ = scheduler.mark(&node.id, DagOutcome::Failed);
206                }
207            }
208            break;
209        }
210        for node_id in ready {
211            let node = nodes_by_id
212                .get(node_id.as_str())
213                .expect("scheduler returned id not present in pipe");
214            let outcome = executor::dispatch(node, ctx)?;
215            let dag_outcome = match outcome {
216                NodeOutcome::Completed => DagOutcome::Completed,
217                NodeOutcome::Failed { .. } => DagOutcome::Failed,
218            };
219            // Ignore the newly-unblocked set; the next `ready()` call
220            // re-derives it from current states.
221            let _ = scheduler
222                .mark(&node_id, dag_outcome)
223                .map_err(|e| CliError::Io(format!("scheduler mark error: {e}")))?;
224        }
225    }
226
227    // Every node terminal. Any Failed or Blocked → pipe.aborted.
228    let mut failed: Vec<&str> = Vec::new();
229    let mut blocked: Vec<&str> = Vec::new();
230    for id in pipe_def.nodes.iter().map(|n| n.id.as_str()) {
231        match scheduler.state(id) {
232            Some(NodeState::Failed) => failed.push(id),
233            Some(NodeState::Blocked) => blocked.push(id),
234            _ => {}
235        }
236    }
237    if failed.is_empty() && blocked.is_empty() {
238        return Ok(TerminalOutcome::Completed);
239    }
240    let mut reason = String::new();
241    if !failed.is_empty() {
242        reason.push_str(&format!("failed nodes: {}", failed.join(", ")));
243    }
244    if !blocked.is_empty() {
245        if !reason.is_empty() {
246            reason.push_str("; ");
247        }
248        reason.push_str(&format!("blocked nodes: {}", blocked.join(", ")));
249    }
250    Ok(TerminalOutcome::Aborted { reason })
251}
252
253/// Parse `--input` args and validate against the pipe schema. Produces
254/// the ordered `Input` list emitted on `pipe.started` and threaded to
255/// every node as `OMNE_INPUT_<KEY>`.
256///
257/// Rules:
258/// - `--input key=value`: unknown keys (not in `pipe.inputs`) rejected.
259/// - Required inputs must be present.
260/// - Missing optional inputs with a `default:` fall back to that default.
261/// - Missing optional inputs with no default are simply omitted.
262fn resolve_inputs(pipe_def: &Pipe, raw: &[String]) -> Result<Vec<Input>, CliError> {
263    let mut provided: BTreeMap<String, String> = BTreeMap::new();
264    let mut issues: Vec<String> = Vec::new();
265
266    for entry in raw {
267        let Some((key, value)) = entry.split_once('=') else {
268            issues.push(format!(
269                "--input {entry:?} must be `key=value` (no `=` found)"
270            ));
271            continue;
272        };
273        let key = key.to_string();
274        if !pipe_def.inputs.contains_key(&key) {
275            issues.push(format!(
276                "--input {key:?} is not declared in pipe `{}` inputs:",
277                pipe_def.name
278            ));
279            continue;
280        }
281        if provided.insert(key.clone(), value.to_string()).is_some() {
282            issues.push(format!("--input {key:?} provided more than once"));
283        }
284    }
285
286    let mut resolved: Vec<Input> = Vec::new();
287    for (name, spec) in &pipe_def.inputs {
288        match provided.get(name) {
289            Some(value) => resolved.push(Input {
290                key: name.clone(),
291                value: value.clone(),
292            }),
293            None => {
294                if let Some(default) = &spec.default {
295                    resolved.push(Input {
296                        key: name.clone(),
297                        value: default.clone(),
298                    });
299                } else if spec.required {
300                    issues.push(format!(
301                        "--input {name:?} is required by pipe `{}` but was not provided",
302                        pipe_def.name
303                    ));
304                }
305            }
306        }
307    }
308
309    if issues.is_empty() {
310        Ok(resolved)
311    } else {
312        Err(CliError::ValidationFailed { issues })
313    }
314}
315
316fn map_pipe_load_error(e: pipe::LoadError, pipe_path: &Path) -> CliError {
317    match e {
318        pipe::LoadError::Parse(perr) => CliError::ValidationFailed {
319            issues: vec![format!("pipe {}: {perr}", pipe_path.display())],
320        },
321        pipe::LoadError::Invalid(errs) => CliError::ValidationFailed {
322            issues: errs
323                .into_iter()
324                .map(|er| format!("pipe {}: {er}", pipe_path.display()))
325                .collect(),
326        },
327    }
328}
329
330/// Best-effort read of `distro-version` from `.omne/omne.md`. Returns
331/// an empty string when the manifest is missing or malformed so the
332/// pipe can still run in test or partial volumes. Full validation
333/// lives in `omne validate` (Unit 14).
334fn read_distro_version(root: &Path) -> String {
335    let readme = volume::omne_dir(root).join("omne.md");
336    let Ok(content) = std::fs::read_to_string(&readme) else {
337        return String::new();
338    };
339    match manifest::parse_frontmatter(&content) {
340        Ok(fm) => fm.distro_version,
341        Err(_) => String::new(),
342    }
343}
344
345fn new_event_id() -> String {
346    Ulid::new().to_string().to_ascii_lowercase()
347}
348
349fn iso_utc_now() -> String {
350    clock::now_utc().format_iso_utc()
351}
352
353/// On Windows, strip the `\\?\` extended-length prefix so downstream
354/// callers (notably `git worktree add`) accept the path. Preserves
355/// genuine `\\?\UNC\server\share` paths unchanged — those are real UNC
356/// shares, not drive-letter paths with the prefix.
357#[cfg(windows)]
358fn strip_unc_prefix(p: std::path::PathBuf) -> std::path::PathBuf {
359    let s = p.to_string_lossy();
360    if let Some(rest) = s.strip_prefix(r"\\?\") {
361        if !rest.starts_with("UNC\\") {
362            return std::path::PathBuf::from(rest.to_string());
363        }
364    }
365    p
366}
367
368#[cfg(not(windows))]
369fn strip_unc_prefix(p: std::path::PathBuf) -> std::path::PathBuf {
370    p
371}