Skip to main content

harmont_cli/orchestrator/
scheduler.rs

1//! Dataflow scheduler.
2//!
3//! Walks the pipeline DAG in topological order, spawning a shared
4//! future per step. Each future awaits its predecessors, acquires a
5//! parallelism permit, and dispatches the step to its registered
6//! runner (Docker by default).
7
8// Pedantic-bucket nags accepted at module scope:
9// - `cast_possible_truncation`: every `as u64` here is a millisecond
10//   wall-clock duration; `u128 -> u64` cannot overflow for any
11//   conceivable build runtime (584 million years).
12// - `expect_used`: semaphore acquire and DAG edge-weight lookups on
13//   edges that are guaranteed to exist by construction.
14// - `too_many_lines` on `run`: setup + dataflow loop form one
15//   cohesive unit; splitting would obscure the spawn/join symmetry.
16// - `missing_panics_doc`: the only panic paths are the semaphore and
17//   edge-weight expects described above.
18#![allow(
19    clippy::cast_possible_truncation,
20    clippy::expect_used,
21    clippy::too_many_lines,
22    clippy::missing_panics_doc
23)]
24
25use std::collections::HashMap;
26use std::path::PathBuf;
27use std::sync::Arc;
28use std::time::Instant;
29
30use daggy::petgraph::algo::toposort;
31use daggy::{Dag, NodeIndex, Walker};
32use futures::future::{BoxFuture, FutureExt, join_all};
33
34use anyhow::{Context, Result};
35use hm_plugin_protocol::{
36    ArchiveId, BuildEvent, CacheDecision, ExecutorInput, PlanSummary, SnapshotRef, StepResult,
37};
38use uuid::Uuid;
39
40use hm_pipeline_ir::{EdgeKind, PipelineGraph, Transition};
41
42use crate::error::HmError;
43use crate::orchestrator::docker_client::DockerClient;
44use crate::orchestrator::source::build_archive_bytes;
45use crate::runner::{OutputRenderer, RunContext, RunnerRegistry};
46
47use super::archive::ArchiveStore;
48use super::cache;
49use super::events::EventBus;
50use tokio_util::sync::CancellationToken;
51
52#[derive(Clone)]
53struct StepOutcome {
54    exit_code: i32,
55    snapshot: Option<SnapshotRef>,
56}
57
58type StepFuture = futures::future::Shared<BoxFuture<'static, StepOutcome>>;
59
60/// Entry point: run a parsed pipeline locally end-to-end. Returns
61/// the overall exit code (0 = success, [`crate::error::EXIT_BUILD_FAILED`]
62/// when any step exited non-zero).
63///
64/// # Errors
65/// Returns an error if the source archive cannot be built, the Docker
66/// daemon is unreachable, or any scheduler-level failure occurs.
67/// Non-zero step exit codes are surfaced via the returned `i32`, not
68/// as an Err.
69pub async fn run(
70    graph: PipelineGraph,
71    repo_root: PathBuf,
72    parallelism: usize,
73    runner_registry: Arc<RunnerRegistry>,
74    renderer: Box<dyn OutputRenderer>,
75) -> Result<i32> {
76    // Set up per-run state.
77    let bus = EventBus::new();
78    let archives = Arc::new(ArchiveStore::new());
79    let cancel = CancellationToken::new();
80    let _ctrlc = super::signal::install_ctrlc(cancel.clone());
81    // _ctrlc dropped at end of `run`; runtime tear-down kills the task.
82    let docker = DockerClient::connect()
83        .map_err(|e| HmError::Docker(format!("daemon unreachable — is Docker running? ({e})")))?;
84    docker
85        .ping()
86        .await
87        .map_err(|e| HmError::Docker(format!("daemon ping failed: {e}")))?;
88    let run_id = Uuid::new_v4();
89
90    // Build the source archive once.
91    let archive_bytes = build_archive_bytes(&repo_root).context("build source archive")?;
92    let archive_id = archives.register(archive_bytes);
93
94    let run_ctx = RunContext {
95        docker: docker.clone(),
96        event_bus: bus.clone(),
97        archives: archives.clone(),
98        cancel: cancel.clone(),
99    };
100
101    let parallelism = parallelism.max(1);
102
103    let semaphore = Arc::new(tokio::sync::Semaphore::new(parallelism));
104
105    // Spawn the output subscriber. Dispatches every BuildEvent to the
106    // pre-constructed renderer.
107    let sink_handle = super::output_subscriber::spawn(bus.clone(), renderer);
108
109    let dag = graph.dag();
110    let chain_info = compute_chain_info(dag);
111
112    let order = toposort(dag.graph(), None)
113        .map_err(|c| anyhow::anyhow!("pipeline graph has a cycle at {:?}", c.node_id()))?;
114
115    let started_at = chrono::Utc::now();
116    bus.emit(BuildEvent::BuildStart {
117        run_id,
118        plan: PlanSummary {
119            step_count: graph.node_count(),
120            chain_count: chain_info.chain_count,
121            default_runner: runner_registry
122                .default_runner_name()
123                .unwrap_or("docker")
124                .into(),
125        },
126        started_at,
127    });
128
129    let started_total = Instant::now();
130
131    let mut done: HashMap<NodeIndex, StepFuture> = HashMap::new();
132
133    for &n in &order {
134        let preds: Vec<(EdgeKind, StepFuture)> = dag
135            .parents(n)
136            .iter(dag)
137            .map(|(e, p)| (*dag.edge_weight(e).expect("edge in DAG"), done[&p].clone()))
138            .collect();
139
140        let transition = dag[n].clone();
141        let chain_id = chain_info.node_chain_id[&n];
142        let chain_pos = chain_info.node_chain_pos[&n];
143        let parent_key: Option<String> = dag
144            .parents(n)
145            .iter(dag)
146            .find(|(e, _)| dag.edge_weight(*e).copied() == Some(EdgeKind::BuildsIn))
147            .map(|(_, p)| dag[p].step.key.clone());
148        let sem = semaphore.clone();
149        let reg = runner_registry.clone();
150        let bus = bus.clone();
151        let cancel = cancel.clone();
152        let run_ctx = run_ctx.clone();
153
154        let fut: StepFuture = async move {
155            // Await all predecessors.
156            let pred_outcomes: Vec<StepOutcome> =
157                join_all(preds.iter().map(|(_, f)| f.clone())).await;
158
159            // Early exit if any predecessor failed or the build was cancelled.
160            if cancel.is_cancelled() || pred_outcomes.iter().any(|o| o.exit_code != 0) {
161                return StepOutcome {
162                    exit_code: 0,
163                    snapshot: None,
164                };
165            }
166
167            // Acquire parallelism permit.
168            let _permit = sem
169                .acquire_owned()
170                .await
171                .expect("semaphore closed unexpectedly");
172
173            // Find the BuildsIn parent's snapshot for container lineage.
174            let parent_snapshot = preds
175                .iter()
176                .zip(&pred_outcomes)
177                .find(|((ek, _), _)| *ek == EdgeKind::BuildsIn)
178                .and_then(|(_, outcome)| outcome.snapshot.clone());
179
180            match execute_step(
181                n,
182                transition,
183                parent_snapshot,
184                chain_id,
185                chain_pos,
186                parent_key,
187                archive_id,
188                run_id,
189                run_ctx,
190                reg,
191                bus,
192                cancel,
193            )
194            .await
195            {
196                Ok(outcome) => outcome,
197                Err(e) => {
198                    tracing::error!(%e, "step execution failed");
199                    StepOutcome {
200                        exit_code: 1,
201                        snapshot: None,
202                    }
203                }
204            }
205        }
206        .boxed()
207        .shared();
208
209        tokio::spawn(fut.clone());
210        done.insert(n, fut);
211    }
212
213    let outcomes: Vec<StepOutcome> = join_all(done.into_values()).await;
214    let overall = if outcomes.iter().any(|o| o.exit_code != 0) {
215        crate::error::EXIT_BUILD_FAILED
216    } else {
217        0
218    };
219
220    let dur = started_total.elapsed().as_millis() as u64;
221
222    // Clean up ephemeral images created during the run.
223    let ephemeral_tags: Vec<&str> = outcomes
224        .iter()
225        .filter_map(|o| o.snapshot.as_ref())
226        .filter(|s| s.0.starts_with("harmont-local-ephemeral/"))
227        .map(|s| s.0.as_str())
228        .collect();
229    for tag in ephemeral_tags {
230        if let Err(e) = docker.remove_image(tag).await {
231            tracing::warn!(image = %tag, %e, "failed to remove ephemeral image");
232        }
233    }
234
235    bus.emit(BuildEvent::BuildEnd {
236        exit_code: overall,
237        duration_ms: dur,
238    });
239
240    let _ = tokio::time::timeout(std::time::Duration::from_secs(2), sink_handle).await;
241
242    Ok(overall)
243}
244
245/// Execute a single step, returning its outcome (exit code + snapshot).
246///
247/// On cache hit the function returns early with exit code 0 and the
248/// cached snapshot so downstream nodes receive the correct
249/// `parent_snapshot` without running the runner at all.
250///
251/// On non-zero exit the cancellation token is cancelled so sibling
252/// tasks observe the failure promptly.
253#[allow(clippy::too_many_arguments)]
254async fn execute_step(
255    _node_idx: NodeIndex,
256    transition: Transition,
257    parent_snapshot: Option<SnapshotRef>,
258    chain_id: usize,
259    chain_pos: usize,
260    parent_key: Option<String>,
261    archive_id: ArchiveId,
262    run_id: Uuid,
263    run_ctx: RunContext,
264    runner_registry: Arc<RunnerRegistry>,
265    bus: Arc<EventBus>,
266    cancel: CancellationToken,
267) -> Result<StepOutcome> {
268    let step_wire = transition.step;
269    let step_key = step_wire.key.clone();
270    let display_name = step_wire.label.clone().unwrap_or_else(|| {
271        let cmd = step_wire.cmd.trim();
272        if cmd.len() <= 40 {
273            cmd.to_owned()
274        } else {
275            format!("{}…", &cmd[..39])
276        }
277    });
278    let env_map = transition.env;
279    let step_id = Uuid::new_v4();
280
281    bus.emit(BuildEvent::StepQueued {
282        step_id,
283        key: step_key.clone(),
284        chain_idx: chain_pos,
285        parent_key: parent_key.clone(),
286        display_name: display_name.clone(),
287    });
288
289    // --- Docker image cache check ---
290    let cache_tag = cache::stable_cache_tag(&step_wire);
291    if let Some(ref dtag) = cache_tag
292        && run_ctx.docker.image_exists(dtag).await.unwrap_or(false)
293    {
294        bus.emit(BuildEvent::StepCacheHit {
295            step_id,
296            key: step_wire
297                .cache
298                .as_ref()
299                .and_then(|c| c.key.clone())
300                .unwrap_or_default(),
301            tag: dtag.clone(),
302        });
303        return Ok(StepOutcome {
304            exit_code: 0,
305            snapshot: Some(SnapshotRef::from(dtag.clone())),
306        });
307    }
308
309    let cache_lookup = cache_tag
310        .as_ref()
311        .map_or(CacheDecision::MissNoCommit, |tag| {
312            CacheDecision::MissBuildAs {
313                tag: SnapshotRef::from(tag.clone()),
314            }
315        });
316
317    let input = ExecutorInput {
318        step: step_wire,
319        workspace_archive_id: archive_id,
320        env: env_map,
321        workdir: "/workspace".to_string(),
322        run_id,
323        step_id,
324        cache_lookup,
325        parent_snapshot,
326    };
327
328    // Resolve the runner by name. Steps that didn't declare a runner
329    // fall back to whichever runner was registered as default (docker).
330    let runner_name = input
331        .step
332        .runner
333        .as_deref()
334        .or_else(|| runner_registry.default_runner_name())
335        .unwrap_or("docker")
336        .to_owned();
337
338    let started = Instant::now();
339    bus.emit(BuildEvent::StepStart {
340        step_id,
341        runner: runner_name.clone(),
342        image: input.step.image.clone(),
343    });
344
345    let runner = runner_registry
346        .resolve(input.step.runner.as_deref())
347        .ok_or_else(|| HmError::UnknownRunner {
348            step_key: input.step.key.clone(),
349            runner: runner_name.clone(),
350            available: runner_registry
351                .runner_names()
352                .into_iter()
353                .map(str::to_owned)
354                .collect(),
355        })?;
356
357    let result: Result<StepResult> = runner.execute(&run_ctx, input).await;
358
359    let dur_ms = started.elapsed().as_millis() as u64;
360    match result {
361        Ok(sr) => {
362            bus.emit(BuildEvent::StepEnd {
363                step_id,
364                exit_code: sr.exit_code,
365                duration_ms: dur_ms,
366                snapshot: sr.committed_snapshot.clone(),
367            });
368            if sr.exit_code != 0 {
369                bus.emit(BuildEvent::ChainFailed {
370                    chain_idx: chain_id,
371                    failed_step_id: step_id,
372                    failed_step_key: step_key.clone(),
373                    exit_code: sr.exit_code,
374                    message: format!("step '{}' exited with code {}", step_key, sr.exit_code),
375                    ts: chrono::Utc::now(),
376                });
377                cancel.cancel();
378            } else if let Some(ref snapshot) = sr.committed_snapshot {
379                if let Some(ref dtag) = cache_tag
380                    && snapshot.0 != *dtag
381                    && let Err(e) = run_ctx.docker.tag_image(&snapshot.0, dtag).await
382                {
383                    tracing::warn!(%e, "failed to re-tag Docker image for cache");
384                }
385                cache::evict_stale_docker_tags(&run_ctx.docker, &step_key, cache_tag.as_deref())
386                    .await;
387            }
388            Ok(StepOutcome {
389                exit_code: sr.exit_code,
390                snapshot: sr.committed_snapshot,
391            })
392        }
393        Err(e) => {
394            bus.emit(BuildEvent::StepEnd {
395                step_id,
396                exit_code: 1,
397                duration_ms: dur_ms,
398                snapshot: None,
399            });
400            Err(e)
401        }
402    }
403}
404
405/// Per-node chain membership used for event enrichment. Maps every
406/// node in the DAG to (`chain_id`, `position_within_chain`).
407struct ChainInfo {
408    chain_count: usize,
409    node_chain_id: HashMap<NodeIndex, usize>,
410    node_chain_pos: HashMap<NodeIndex, usize>,
411}
412
413/// Walk the DAG and assign each node to a linear chain. A chain starts
414/// at any node not yet assigned and extends forward through single
415/// `BuildsIn` children where the child has exactly one parent total.
416/// This mirrors `PipelineGraph::chains()` but lives as a free function
417/// operating on the raw `Dag`.
418fn compute_chain_info(dag: &Dag<Transition, EdgeKind>) -> ChainInfo {
419    let mut node_chain_id: HashMap<NodeIndex, usize> = HashMap::new();
420    let mut node_chain_pos: HashMap<NodeIndex, usize> = HashMap::new();
421    let mut chain_count: usize = 0;
422
423    // Walk nodes in index order.
424    let mut indices: Vec<NodeIndex> = dag.graph().node_indices().collect();
425    indices.sort();
426
427    for idx in indices {
428        if node_chain_id.contains_key(&idx) {
429            continue;
430        }
431
432        // Start a new chain rooted at this unvisited node.
433        let chain_id = chain_count;
434        chain_count += 1;
435
436        let mut cur = idx;
437        let mut pos: usize = 0;
438        loop {
439            node_chain_id.insert(cur, chain_id);
440            node_chain_pos.insert(cur, pos);
441            pos += 1;
442
443            // Collect BuildsIn children of `cur`.
444            let builds_in_children: Vec<NodeIndex> = dag
445                .children(cur)
446                .iter(dag)
447                .filter(|(e, _)| dag.edge_weight(*e).copied() == Some(EdgeKind::BuildsIn))
448                .map(|(_, child)| child)
449                .collect();
450
451            // Follow the chain only if there's exactly one BuildsIn child...
452            if builds_in_children.len() != 1 {
453                break;
454            }
455            let child = builds_in_children[0];
456
457            // ...that hasn't been assigned yet...
458            if node_chain_id.contains_key(&child) {
459                break;
460            }
461
462            // ...and that child has exactly one parent total.
463            let parent_count = dag.parents(child).iter(dag).count();
464            if parent_count != 1 {
465                break;
466            }
467
468            cur = child;
469        }
470    }
471
472    ChainInfo {
473        chain_count,
474        node_chain_id,
475        node_chain_pos,
476    }
477}