Skip to main content

harmont_cli/commands/run/
local.rs

1use anyhow::{Context, Result};
2
3use super::render::{ToolPaths, list_pipelines, render_pipeline_json};
4use crate::cli::RunArgs;
5use crate::context::RunContext;
6use crate::output::format::banner;
7
8fn decode_plan_to_wire(bytes: &[u8]) -> anyhow::Result<hm_plugin_protocol::Pipeline> {
9    serde_json::from_slice(bytes).map_err(|e| anyhow::anyhow!("decode pipeline JSON: {e}"))
10}
11
12/// Run a pipeline locally via Docker.
13///
14/// # Errors
15///
16/// Returns an error if the working directory cannot be resolved, no
17/// pipeline slug was given when more than one is declared (or none are
18/// declared), the Python DSL transpile or Scheme evaluator step fails,
19/// the resulting plan does not decode, the Docker daemon is unreachable,
20/// or the orchestrator surfaces an internal scheduler error. Non-zero
21/// step exit codes are returned as the `i32`, not as an Err.
22pub async fn handle(args: RunArgs, _ctx: RunContext) -> Result<i32> {
23    let repo_root = match args.dir.clone() {
24        Some(p) => p,
25        None => std::env::current_dir().context("cannot determine current directory")?,
26    };
27
28    let tools = ToolPaths::discover()?;
29
30    let slug = if let Some(s) = &args.pipeline {
31        s.clone()
32    } else {
33        let metas = list_pipelines(&tools, &repo_root).await?;
34        let slugs: Vec<String> = metas.into_iter().map(|m| m.slug).collect();
35        match slugs.as_slice() {
36            [only] => only.clone(),
37            [] => anyhow::bail!(
38                "no pipelines declared in this repo\n  \
39                 hint: define one with `@hm.pipeline(\"slug\")` in `.harmont/pipeline.py`"
40            ),
41            many => anyhow::bail!(
42                "this repo declares pipelines: {}\n  → pass one as the first argument",
43                many.join(", ")
44            ),
45        }
46    };
47
48    if args.format == "human" {
49        banner("run --local", &format!("slug={slug}"));
50    }
51
52    let json = render_pipeline_json(&tools, &repo_root, &slug).await?;
53    let pipeline_wire = decode_plan_to_wire(&json)?;
54    let parallelism = args.parallelism.unwrap_or_else(|| {
55        std::thread::available_parallelism().map_or(4, std::num::NonZeroUsize::get)
56    });
57    let exit_code =
58        crate::orchestrator::run(pipeline_wire, repo_root, parallelism, args.format.clone())
59            .await?;
60    Ok(exit_code)
61}