Skip to main content

harmont_cli/orchestrator/
scheduler.rs

1//! Chain-bounded scheduler. Dispatches each step to its registered
2//! step-executor plugin (Docker by default) via the plugin host.
3
4// Pedantic-bucket nags accepted at module scope:
5// - `cast_possible_truncation`: every `as u64` here is a millisecond
6//   wall-clock duration; `u128 -> u64` cannot overflow for any
7//   conceivable build runtime (584 million years).
8// - `expect_used` on the semaphore: `acquire_owned` only errors if the
9//   semaphore is closed, which we never close.
10// - `too_many_lines` on `run`: the scheduler body is one cohesive
11//   loop; splitting it would obscure the spawn/join symmetry.
12// - `missing_panics_doc`: the only panic path is the semaphore expect
13//   described above; the function docstring already explains its
14//   error surface.
15#![allow(
16    clippy::cast_possible_truncation,
17    clippy::expect_used,
18    clippy::too_many_lines,
19    clippy::missing_panics_doc,
20    // `significant_drop_tightening`: the registry MutexGuard in the
21    // --format validation block is held only across constant-time
22    // hash-map lookups; the lint would have us scatter `drop(reg)`
23    // calls that add no clarity.
24    clippy::significant_drop_tightening
25)]
26
27use std::collections::{HashMap, HashSet};
28use std::path::PathBuf;
29use std::sync::Arc;
30use std::time::Instant;
31
32use anyhow::{Context, Result};
33use hm_plugin_protocol::{
34    ArchiveId, BuildEvent, ExecutorInput, PlanSummary, SnapshotRef, StepResult,
35};
36use tokio::sync::Mutex;
37use uuid::Uuid;
38
39use crate::error::HmError;
40use crate::orchestrator::docker_client::DockerClient;
41use crate::orchestrator::graph::Graph;
42use crate::orchestrator::source::build_archive_bytes;
43use crate::plugin::{PluginRegistry, RegistryConfig};
44
45use super::archive::ArchiveStore;
46use super::cache;
47use super::cancel::CancellationToken;
48use super::events::EventBus;
49use super::state::{self, OrchestratorState};
50
51/// Entry point: run a parsed pipeline locally end-to-end. Returns
52/// the overall exit code (0 = success, [`crate::error::EXIT_BUILD_FAILED`]
53/// when any step exited non-zero).
54///
55/// # Errors
56/// Returns an error if plugin discovery fails, the source archive
57/// cannot be built, the Docker daemon is unreachable, or any
58/// scheduler-level failure occurs. Non-zero step exit codes are
59/// surfaced via the returned `i32`, not as an Err.
60pub async fn run(
61    pipeline: hm_plugin_protocol::Pipeline,
62    repo_root: PathBuf,
63    parallelism: usize,
64    format_name: String,
65) -> Result<i32> {
66    // Build graph + chains directly from the wire-typed pipeline.
67    let graph = Graph::build(&pipeline).context("build graph")?;
68    let chains = graph.chains();
69    let chain_deps = graph.chain_deps(&chains);
70
71    // Set up per-run state.
72    let bus = EventBus::new();
73    let archives = ArchiveStore::new();
74    let cancel = CancellationToken::new();
75    let _ctrlc = crate::plugin::signal::install_ctrlc(cancel.clone());
76    // _ctrlc dropped at end of `run`; runtime tear-down kills the task.
77    let docker = DockerClient::connect()
78        .map_err(|e| HmError::Docker(format!("daemon unreachable — is Docker running? ({e})")))?;
79    docker
80        .ping()
81        .await
82        .map_err(|e| HmError::Docker(format!("daemon ping failed: {e}")))?;
83    let run_id = Uuid::new_v4();
84
85    // Build the source archive once.
86    let archive_bytes = build_archive_bytes(&repo_root).context("build source archive")?;
87    let archive_id = archives.register(archive_bytes);
88
89    // Install per-run state for host fns to read.
90    let state_arc = Arc::new(OrchestratorState {
91        event_bus: bus.clone(),
92        archives,
93        cancel: cancel.clone(),
94        docker: docker.clone(),
95        run_id,
96    });
97    state::install(state_arc.clone());
98
99    let parallelism = parallelism.max(1);
100
101    // Load the plugin registry with the embedded docker plugin.
102    // The docker runner's pool gets pre-sized to `parallelism` so
103    // concurrent chains can run truly in parallel rather than
104    // serialising on a single plugin instance.
105    let mut pool_sizes: std::collections::BTreeMap<String, usize> =
106        std::collections::BTreeMap::new();
107    pool_sizes.insert("docker".to_string(), parallelism);
108    let registry = Arc::new(Mutex::new(
109        PluginRegistry::load(RegistryConfig {
110            auto_discover: true,
111            extra_paths: vec![],
112            embedded: vec![
113                (
114                    "harmont-docker",
115                    crate::plugin::embedded::DOCKER_PLUGIN_WASM,
116                ),
117                (
118                    "harmont-output-human",
119                    crate::plugin::embedded::OUTPUT_HUMAN_PLUGIN_WASM,
120                ),
121                (
122                    "harmont-output-json",
123                    crate::plugin::embedded::OUTPUT_JSON_PLUGIN_WASM,
124                ),
125            ],
126            pool_sizes,
127        })
128        .context("load plugin registry")?,
129    ));
130
131    // Validate the requested output format BEFORE emitting BuildStart
132    // so an invalid `--format` fails fast without producing any output.
133    // We materialise the available list under the lock and then drop
134    // the guard before the (rare) bail to satisfy
135    // `clippy::significant_drop_tightening`.
136    let bad_format: Option<Vec<String>> = {
137        let reg = registry.lock().await;
138        if reg.output_formatter_index.contains_key(&format_name) {
139            None
140        } else {
141            let mut names: Vec<String> = reg.output_formatter_index.keys().cloned().collect();
142            names.sort();
143            Some(names)
144        }
145    };
146    if let Some(available) = bad_format {
147        anyhow::bail!(
148            "unknown --format '{format_name}'; available: {}",
149            available.join(", ")
150        );
151    }
152
153    let semaphore = Arc::new(tokio::sync::Semaphore::new(parallelism));
154
155    // Cross-chain snapshot lineage. When a step completes, we stash
156    // its `committed_snapshot` under its node index. A fork-child
157    // chain looks up its `builds_in` parent here to know what base
158    // image to boot from. Mirrors legacy `SharedState::node_image`.
159    let node_image: Arc<Mutex<HashMap<usize, SnapshotRef>>> = Arc::new(Mutex::new(HashMap::new()));
160
161    // Spawn the output subscriber. Dispatches every BuildEvent to the
162    // selected output-formatter plugin (default: `human`).
163    let sink_handle =
164        super::output_subscriber::spawn(bus.clone(), registry.clone(), format_name.clone());
165
166    // Announce build start.
167    let started_at = chrono::Utc::now();
168    let plan_summary = PlanSummary {
169        step_count: graph.nodes.len(),
170        chain_count: chains.len(),
171        default_runner: "docker".into(),
172    };
173    bus.emit(BuildEvent::BuildStart {
174        run_id,
175        plan: plan_summary,
176        started_at,
177    });
178
179    // Schedule chains. Each chain runs sequentially internally; chains
180    // run concurrently subject to the semaphore and the chain_deps DAG.
181    let started_total = Instant::now();
182    let mut overall = 0i32;
183    let mut completed: HashSet<usize> = HashSet::new();
184    let mut pending: Vec<usize> = (0..chains.len()).collect();
185    let mut in_flight: tokio::task::JoinSet<(usize, Result<i32>)> = tokio::task::JoinSet::new();
186
187    loop {
188        // Spawn ready chains.
189        let mut still_pending = Vec::with_capacity(pending.len());
190        for ci in std::mem::take(&mut pending) {
191            let ready = chain_deps[ci].iter().all(|d| completed.contains(d));
192            if !ready {
193                still_pending.push(ci);
194                continue;
195            }
196            let semaphore = semaphore.clone();
197            let registry = registry.clone();
198            let graph = graph.clone();
199            let cancel = cancel.clone();
200            let chain_nodes = chains[ci].clone();
201            let bus = bus.clone();
202            let node_image = node_image.clone();
203            in_flight.spawn(async move {
204                let _permit = semaphore.acquire_owned().await.expect("semaphore");
205                if cancel.is_cancelled() {
206                    return (ci, Ok(0));
207                }
208                let rc = run_chain(
209                    ci,
210                    &graph,
211                    &chain_nodes,
212                    archive_id,
213                    run_id,
214                    &registry,
215                    &bus,
216                    &cancel,
217                    &node_image,
218                )
219                .await;
220                (ci, rc)
221            });
222        }
223        pending = still_pending;
224
225        if in_flight.is_empty() {
226            break;
227        }
228
229        match in_flight.join_next().await {
230            Some(Ok((ci, Ok(0)))) => {
231                completed.insert(ci);
232            }
233            Some(Ok((ci, Ok(_rc)))) => {
234                overall = crate::error::EXIT_BUILD_FAILED;
235                cancel.cancel();
236                completed.insert(ci);
237                // ChainFailed already emitted by run_chain; no stderr write here.
238            }
239            Some(Ok((_, Err(e)))) => {
240                cancel.cancel();
241                bus.emit(BuildEvent::BuildEnd {
242                    exit_code: crate::error::EXIT_BUILD_FAILED,
243                    duration_ms: started_total.elapsed().as_millis() as u64,
244                });
245                return Err(e);
246            }
247            Some(Err(je)) => {
248                cancel.cancel();
249                bus.emit(BuildEvent::BuildEnd {
250                    exit_code: crate::error::EXIT_BUILD_FAILED,
251                    duration_ms: started_total.elapsed().as_millis() as u64,
252                });
253                return Err(anyhow::anyhow!("chain task panicked: {je}"));
254            }
255            None => break,
256        }
257    }
258
259    let dur = started_total.elapsed().as_millis() as u64;
260    bus.emit(BuildEvent::BuildEnd {
261        exit_code: overall,
262        duration_ms: dur,
263    });
264
265    // Wait briefly for the sink to drain the BuildEnd event. It exits
266    // when it sees BuildEnd, so this completes quickly.
267    let _ = tokio::time::timeout(std::time::Duration::from_secs(2), sink_handle).await;
268
269    state::clear();
270    drop(state_arc);
271    Ok(overall)
272}
273
274/// Drive one chain end-to-end. Each step within a chain runs
275/// sequentially, with the previous step's snapshot becoming the next
276/// step's `parent_snapshot` input.
277///
278/// `node_image` is the cross-chain lineage map: when this chain's
279/// root is a fork-child (its `builds_in` parent lives in another
280/// chain), we look up the parent's committed snapshot there to seed
281/// our initial `parent_snapshot`. Each step we run records its
282/// committed snapshot back so downstream fork-children can find it.
283#[allow(
284    clippy::too_many_arguments,
285    reason = "tightly-coupled per-run state — splitting into a struct would just rename the bag"
286)]
287async fn run_chain(
288    chain_idx: usize,
289    graph: &Graph,
290    chain_nodes: &[usize],
291    archive_id: ArchiveId,
292    run_id: Uuid,
293    registry: &Arc<Mutex<PluginRegistry>>,
294    bus: &Arc<EventBus>,
295    cancel: &CancellationToken,
296    node_image: &Arc<Mutex<HashMap<usize, SnapshotRef>>>,
297) -> Result<i32> {
298    // Seed from the cross-chain lineage map: if this chain's root has
299    // a `builds_in` parent that already committed a snapshot, boot
300    // from it. Otherwise this is a chain-root proper and starts from
301    // the step's image.
302    let chain_root = chain_nodes[0];
303    let mut parent_snapshot: Option<SnapshotRef> = {
304        let g = node_image.lock().await;
305        graph.nodes[chain_root]
306            .builds_in
307            .and_then(|p| g.get(&p).cloned())
308    };
309
310    for (pos, &i) in chain_nodes.iter().enumerate() {
311        if cancel.is_cancelled() {
312            return Ok(0);
313        }
314        let step_wire = graph.nodes[i].step.clone();
315        // Keep a copy of the step key for diagnostics — `step_wire` is
316        // moved into `ExecutorInput` below.
317        let step_key = step_wire.key.clone();
318        let env_map: std::collections::BTreeMap<String, String> =
319            graph.nodes[i].env.clone().into_iter().collect();
320        let step_id = Uuid::new_v4();
321
322        bus.emit(BuildEvent::StepQueued {
323            step_id,
324            key: step_key.clone(),
325            chain_idx: pos,
326        });
327
328        // Decide cache outcome host-side.
329        let decision = {
330            let s = state::current().context("no orchestrator state")?;
331            cache::decide(&s.docker, &step_wire).await?
332        };
333        if let hm_plugin_protocol::CacheDecision::Hit { tag } = &decision {
334            bus.emit(BuildEvent::StepCacheHit {
335                step_id,
336                key: step_wire
337                    .cache
338                    .as_ref()
339                    .and_then(|c| c.key.clone())
340                    .unwrap_or_default(),
341                tag: tag.0.clone(),
342            });
343        }
344
345        let input = ExecutorInput {
346            step: step_wire,
347            workspace_archive_id: archive_id,
348            env: env_map,
349            workdir: "/workspace".to_string(),
350            run_id,
351            step_id,
352            cache_lookup: decision,
353            parent_snapshot: parent_snapshot.clone(),
354        };
355
356        // `input.step.runner` is the IR field as-declared. Steps that
357        // didn't declare a runner fall back to whichever plugin
358        // registered as `default: true` (docker, in the embedded
359        // binary). The hardcoded `"docker"` is only a last-resort
360        // fallback when no plugin claims default — practically
361        // unreachable, but cheap to keep so the dispatch lookup below
362        // still has a string to look up.
363        let runner = if let Some(name) = input.step.runner.clone() {
364            name
365        } else {
366            let reg = registry.lock().await;
367            reg.default_runner_name()
368                .map_or_else(|| "docker".into(), str::to_string)
369        };
370        let started = Instant::now();
371        bus.emit(BuildEvent::StepStart {
372            step_id,
373            runner: runner.clone(),
374            image: input.step.image.clone(),
375        });
376
377        // Dispatch to the runner-named plugin. Look up the Arc under
378        // the registry lock, drop the lock BEFORE awaiting so other
379        // chains can dispatch concurrently — the per-plugin pool
380        // serialises (or parallelises, up to its capacity) calls
381        // internally.
382        let plugin = {
383            let reg = registry.lock().await;
384            let idx = reg
385                .runner_index
386                .get(&runner)
387                .copied()
388                .or(reg.default_runner)
389                .ok_or_else(|| HmError::UnknownRunner {
390                    step_key: input.step.key.clone(),
391                    runner: runner.clone(),
392                    available: reg.runner_index.keys().cloned().collect(),
393                })?;
394            reg.get(idx).context("plugin moved away under us")?
395        };
396        crate::plugin::host_fns::set_current_step_id(step_id);
397        let result: Result<StepResult> = plugin.call_capability("hm_executor_run", &input).await;
398        crate::plugin::host_fns::clear_current_step_id();
399
400        let dur_ms = started.elapsed().as_millis() as u64;
401        match result {
402            Ok(sr) => {
403                bus.emit(BuildEvent::StepEnd {
404                    step_id,
405                    exit_code: sr.exit_code,
406                    duration_ms: dur_ms,
407                    snapshot: sr.committed_snapshot.clone(),
408                });
409                // Publish this step's committed snapshot to the
410                // cross-chain map so fork-children rooted at this
411                // node can boot from it.
412                if let Some(snap) = sr.committed_snapshot.clone() {
413                    let mut g = node_image.lock().await;
414                    g.insert(i, snap);
415                }
416                parent_snapshot = sr.committed_snapshot;
417                if sr.exit_code != 0 {
418                    bus.emit(BuildEvent::ChainFailed {
419                        chain_idx,
420                        failed_step_id: step_id,
421                        failed_step_key: step_key.clone(),
422                        exit_code: sr.exit_code,
423                        message: format!("step '{}' exited with code {}", step_key, sr.exit_code),
424                        ts: chrono::Utc::now(),
425                    });
426                    return Ok(sr.exit_code);
427                }
428            }
429            Err(e) => {
430                bus.emit(BuildEvent::StepEnd {
431                    step_id,
432                    exit_code: 1,
433                    duration_ms: dur_ms,
434                    snapshot: None,
435                });
436                return Err(e);
437            }
438        }
439    }
440    Ok(0)
441}