omne-cli 0.2.0

CLI for managing omne volumes: init, upgrade, and validate kernel and distro releases
Documentation
//! `omne run <pipe> [--input k=v ...]` — end-to-end pipe runner.
//!
//! Stitches Units 4–11 together: pipe load + input validation, ULID
//! allocation, worktree creation, per-run event log, DAG scheduling,
//! node dispatch, terminal `pipe.completed` / `pipe.aborted`.
//!
//! Transactional state lifecycle (plan HLD):
//! 1. Path-length + `claude --version` preflight — errors leave no state.
//! 2. Load + validate pipe against volume.
//! 3. Parse `--input` against pipe `inputs:` schema — errors leave no state.
//! 4. Allocate ULID → print `run_id=<value>` as first synchronous stdout
//!    line.
//! 5. Create detached-HEAD worktree.
//! 6. Open `events.jsonl`. On failure, teardown the worktree so every
//!    live worktree has a live run dir.
//! 7. Emit `pipe.started` + dispatch loop + emit `pipe.completed` or
//!    `pipe.aborted`.
//!
//! Worktree is **not** auto-removed on successful runs in v1 — users
//! reclaim via `git worktree remove .omne/wt/<run_id>`. See plan "Deferred
//! to Separate Tasks: `omne cleanup`".

// Test-seam function `run_at_root` is called from the integration test
// suite via lib.rs but not from main.rs; suppress the bin-crate warning.
#![allow(dead_code)]

use std::collections::BTreeMap;
use std::io::{self, Write};
use std::path::Path;

use clap::Args as ClapArgs;
use ulid::Ulid;

use crate::claude_proc;
use crate::clock;
use crate::dag::{NodeOutcome as DagOutcome, NodeState, Scheduler};
use crate::error::CliError;
use crate::event_log::EventLog;
use crate::events::{Event, Input, PipeAborted, PipeCompleted, PipeStarted};
use crate::executor::{self, ExecutorContext, NodeOutcome};
use crate::manifest;
use crate::pipe::{self, Pipe};
use crate::ulid as ulid_alloc;
use crate::volume;
use crate::worktree::{self, RemoveMode};

/// Arguments for `omne run`.
#[derive(Debug, ClapArgs)]
pub struct Args {
    /// Pipe name — resolved to `.omne/dist/pipes/<name>.md`.
    pub pipe: String,

    /// `--input key=value` pair. Repeatable. Validated against the pipe's
    /// `inputs:` schema during preflight.
    #[arg(long = "input", value_name = "KEY=VALUE")]
    pub input: Vec<String>,
}

pub fn run(args: &Args) -> Result<(), CliError> {
    let cwd = std::env::current_dir()
        .map_err(|e| CliError::Io(format!("cannot determine current directory: {e}")))?;
    run_at_root(&cwd, args, &mut io::stdout(), None)
}

/// Test seam: run against an arbitrary starting directory with an
/// injectable stdout sink and optional `claude` binary override.
pub fn run_at_root(
    start: &Path,
    args: &Args,
    stdout: &mut dyn Write,
    claude_bin: Option<&Path>,
) -> Result<(), CliError> {
    // `find_omne_root` canonicalizes, which on Windows yields a
    // `\\?\C:\...` extended-length path. `git worktree add` refuses
    // those with a cryptic "could not create leading directories"
    // error. Normalize back to a regular absolute path before anything
    // else touches the root.
    let root = strip_unc_prefix(volume::find_omne_root(start).ok_or(CliError::NotAVolume)?);

    // Preflight — no state written on failure.
    worktree::preflight_volume_path_length(&root)?;

    // Pipe load + structural/volume validation.
    let pipe_path = volume::dist_dir(&root)
        .join("pipes")
        .join(format!("{}.md", args.pipe));
    let pipe_def = pipe::load(&pipe_path, &root).map_err(|e| map_pipe_load_error(e, &pipe_path))?;

    // Claude preflight only if the pipe has AI-bearing nodes.
    if pipe_def.needs_claude() {
        claude_proc::preflight(claude_bin)?;
    }

    // Inputs preflight — errors leave no state.
    let inputs = resolve_inputs(&pipe_def, &args.input)?;

    // Mint run_id.
    let ulid_value = ulid_alloc::allocate(&volume::ulid_lock_path(&root))?;
    let run_id = format!(
        "{}-{}",
        pipe_def.name,
        ulid_value.to_string().to_ascii_lowercase()
    );

    // `run_id=<value>` MUST be the first synchronous stdout line (plan
    // R13) so an agent spawning `omne run` as a long-running subprocess
    // can correlate stdout with `events.jsonl` before anything else
    // lands.
    writeln!(stdout, "run_id={run_id}")
        .map_err(|e| CliError::Io(format!("stdout write failed: {e}")))?;
    stdout
        .flush()
        .map_err(|e| CliError::Io(format!("stdout flush failed: {e}")))?;

    // Worktree first, then event log. If log open fails, tear the
    // worktree down so the invariant "every live worktree has a live
    // run dir" holds.
    let wt_path = worktree::create(&root, &run_id)?;
    let log = match EventLog::for_run(&root, &run_id) {
        Ok(l) => l,
        Err(e) => {
            let _ = worktree::remove(&root, &run_id, RemoveMode::Force);
            return Err(e.into());
        }
    };

    // Pipe started. `distro_version` best-effort: a fresh-test volume
    // may skip `omne.md`, and an unreadable/malformed manifest is not
    // a reason to fail a run — the event field is
    // `skip_serializing_if = "String::is_empty"` so the empty case
    // round-trips cleanly.
    let distro_version = read_distro_version(&root);
    log.append(&Event::PipeStarted(PipeStarted {
        id: new_event_id(),
        ts: iso_utc_now(),
        run_id: run_id.clone(),
        pipe: pipe_def.name.clone(),
        inputs: inputs.clone(),
        distro_version,
    }))?;

    // Executor context. Defaults for node / gate timeouts come from
    // `ExecutorContext::new`; `default_model` + `claude_bin` are the
    // only per-run overrides the pipe surface gives us.
    let mut ctx = ExecutorContext::new(&root, &run_id, &wt_path, &log, &inputs);
    ctx.default_model = pipe_def.default_model.as_deref();
    ctx.claude_bin = claude_bin;

    // DAG loop.
    let terminal = run_scheduler(&pipe_def, &ctx)?;

    // Emit terminal event + map to CLI exit semantics.
    match terminal {
        TerminalOutcome::Completed => {
            log.append(&Event::PipeCompleted(PipeCompleted {
                id: new_event_id(),
                ts: iso_utc_now(),
                run_id: run_id.clone(),
            }))?;
            Ok(())
        }
        TerminalOutcome::Aborted { reason } => {
            log.append(&Event::PipeAborted(PipeAborted {
                id: new_event_id(),
                ts: iso_utc_now(),
                run_id: run_id.clone(),
                reason: reason.clone(),
            }))?;
            Err(CliError::PipeAborted { run_id, reason })
        }
    }
}

// ── Helpers ────────────────────────────────────────────────────────

/// Pipe-level outcome fed back to the CLI exit mapping.
enum TerminalOutcome {
    Completed,
    Aborted { reason: String },
}

/// Single-threaded scheduler loop: dispatch each newly-ready node
/// immediately, feed the outcome back to `Scheduler::mark`, repeat until
/// terminal. Sequential dispatch is deliberate for v1 — the scheduler
/// itself supports parallel fanout, but concurrent subprocess
/// orchestration (spawn/wait coordination, cross-node capture ordering)
/// is deferred. Fanout-shaped pipes still produce the correct
/// happens-before relationships via `depends_on` + `trigger_rule`.
fn run_scheduler<'a>(
    pipe_def: &'a Pipe,
    ctx: &ExecutorContext<'a>,
) -> Result<TerminalOutcome, CliError> {
    let mut scheduler = Scheduler::new(pipe_def);
    let nodes_by_id: BTreeMap<&str, &pipe::Node> =
        pipe_def.nodes.iter().map(|n| (n.id.as_str(), n)).collect();

    while !scheduler.is_terminal() {
        let ready = scheduler.ready();
        if ready.is_empty() {
            // No ready nodes but not terminal: deadlocked. Cascade-block
            // all remaining Pending nodes so the abort reasoning includes
            // them.
            for node in &pipe_def.nodes {
                if scheduler.state(&node.id) == Some(NodeState::Pending) {
                    let _ = scheduler.mark(&node.id, DagOutcome::Failed);
                }
            }
            break;
        }
        for node_id in ready {
            let node = nodes_by_id
                .get(node_id.as_str())
                .expect("scheduler returned id not present in pipe");
            let outcome = executor::dispatch(node, ctx)?;
            let dag_outcome = match outcome {
                NodeOutcome::Completed => DagOutcome::Completed,
                NodeOutcome::Failed { .. } => DagOutcome::Failed,
            };
            // Ignore the newly-unblocked set; the next `ready()` call
            // re-derives it from current states.
            let _ = scheduler
                .mark(&node_id, dag_outcome)
                .map_err(|e| CliError::Io(format!("scheduler mark error: {e}")))?;
        }
    }

    // Every node terminal. Any Failed or Blocked → pipe.aborted.
    let mut failed: Vec<&str> = Vec::new();
    let mut blocked: Vec<&str> = Vec::new();
    for id in pipe_def.nodes.iter().map(|n| n.id.as_str()) {
        match scheduler.state(id) {
            Some(NodeState::Failed) => failed.push(id),
            Some(NodeState::Blocked) => blocked.push(id),
            _ => {}
        }
    }
    if failed.is_empty() && blocked.is_empty() {
        return Ok(TerminalOutcome::Completed);
    }
    let mut reason = String::new();
    if !failed.is_empty() {
        reason.push_str(&format!("failed nodes: {}", failed.join(", ")));
    }
    if !blocked.is_empty() {
        if !reason.is_empty() {
            reason.push_str("; ");
        }
        reason.push_str(&format!("blocked nodes: {}", blocked.join(", ")));
    }
    Ok(TerminalOutcome::Aborted { reason })
}

/// Parse `--input` args and validate against the pipe schema. Produces
/// the ordered `Input` list emitted on `pipe.started` and threaded to
/// every node as `OMNE_INPUT_<KEY>`.
///
/// Rules:
/// - `--input key=value`: unknown keys (not in `pipe.inputs`) rejected.
/// - Required inputs must be present.
/// - Missing optional inputs with a `default:` fall back to that default.
/// - Missing optional inputs with no default are simply omitted.
fn resolve_inputs(pipe_def: &Pipe, raw: &[String]) -> Result<Vec<Input>, CliError> {
    let mut provided: BTreeMap<String, String> = BTreeMap::new();
    let mut issues: Vec<String> = Vec::new();

    for entry in raw {
        let Some((key, value)) = entry.split_once('=') else {
            issues.push(format!(
                "--input {entry:?} must be `key=value` (no `=` found)"
            ));
            continue;
        };
        let key = key.to_string();
        if !pipe_def.inputs.contains_key(&key) {
            issues.push(format!(
                "--input {key:?} is not declared in pipe `{}` inputs:",
                pipe_def.name
            ));
            continue;
        }
        if provided.insert(key.clone(), value.to_string()).is_some() {
            issues.push(format!("--input {key:?} provided more than once"));
        }
    }

    let mut resolved: Vec<Input> = Vec::new();
    for (name, spec) in &pipe_def.inputs {
        match provided.get(name) {
            Some(value) => resolved.push(Input {
                key: name.clone(),
                value: value.clone(),
            }),
            None => {
                if let Some(default) = &spec.default {
                    resolved.push(Input {
                        key: name.clone(),
                        value: default.clone(),
                    });
                } else if spec.required {
                    issues.push(format!(
                        "--input {name:?} is required by pipe `{}` but was not provided",
                        pipe_def.name
                    ));
                }
            }
        }
    }

    if issues.is_empty() {
        Ok(resolved)
    } else {
        Err(CliError::ValidationFailed { issues })
    }
}

fn map_pipe_load_error(e: pipe::LoadError, pipe_path: &Path) -> CliError {
    match e {
        pipe::LoadError::Parse(perr) => CliError::ValidationFailed {
            issues: vec![format!("pipe {}: {perr}", pipe_path.display())],
        },
        pipe::LoadError::Invalid(errs) => CliError::ValidationFailed {
            issues: errs
                .into_iter()
                .map(|er| format!("pipe {}: {er}", pipe_path.display()))
                .collect(),
        },
    }
}

/// Best-effort read of `distro-version` from `.omne/omne.md`. Returns
/// an empty string when the manifest is missing or malformed so the
/// pipe can still run in test or partial volumes. Full validation
/// lives in `omne validate` (Unit 14).
fn read_distro_version(root: &Path) -> String {
    let readme = volume::omne_dir(root).join("omne.md");
    let Ok(content) = std::fs::read_to_string(&readme) else {
        return String::new();
    };
    match manifest::parse_frontmatter(&content) {
        Ok(fm) => fm.distro_version,
        Err(_) => String::new(),
    }
}

fn new_event_id() -> String {
    Ulid::new().to_string().to_ascii_lowercase()
}

fn iso_utc_now() -> String {
    clock::now_utc().format_iso_utc()
}

/// On Windows, strip the `\\?\` extended-length prefix so downstream
/// callers (notably `git worktree add`) accept the path. Preserves
/// genuine `\\?\UNC\server\share` paths unchanged — those are real UNC
/// shares, not drive-letter paths with the prefix.
#[cfg(windows)]
fn strip_unc_prefix(p: std::path::PathBuf) -> std::path::PathBuf {
    let s = p.to_string_lossy();
    if let Some(rest) = s.strip_prefix(r"\\?\") {
        if !rest.starts_with("UNC\\") {
            return std::path::PathBuf::from(rest.to_string());
        }
    }
    p
}

#[cfg(not(windows))]
fn strip_unc_prefix(p: std::path::PathBuf) -> std::path::PathBuf {
    p
}