harmont_cli/orchestrator/
scheduler.rs1#![allow(
19 clippy::cast_possible_truncation,
20 clippy::expect_used,
21 clippy::too_many_lines,
22 clippy::missing_panics_doc
23)]
24
25use std::collections::HashMap;
26use std::path::PathBuf;
27use std::sync::Arc;
28use std::time::Instant;
29
30use daggy::petgraph::algo::toposort;
31use daggy::{Dag, NodeIndex, Walker};
32use futures::future::{BoxFuture, FutureExt, join_all};
33
34use anyhow::{Context, Result};
35use hm_plugin_protocol::{
36 ArchiveId, BuildEvent, CacheDecision, ExecutorInput, PlanSummary, SnapshotRef, StepResult,
37};
38use uuid::Uuid;
39
40use hm_pipeline_ir::{EdgeKind, PipelineGraph, Transition};
41
42use crate::error::HmError;
43use crate::orchestrator::docker_client::DockerClient;
44use crate::orchestrator::source::build_archive_bytes;
45use crate::runner::{OutputRenderer, RunContext, RunnerRegistry};
46
47use super::archive::ArchiveStore;
48use super::cache;
49use super::events::EventBus;
50use tokio_util::sync::CancellationToken;
51
52#[derive(Clone)]
53struct StepOutcome {
54 exit_code: i32,
55 snapshot: Option<SnapshotRef>,
56}
57
58type StepFuture = futures::future::Shared<BoxFuture<'static, StepOutcome>>;
59
60pub async fn run(
70 graph: PipelineGraph,
71 repo_root: PathBuf,
72 parallelism: usize,
73 runner_registry: Arc<RunnerRegistry>,
74 renderer: Box<dyn OutputRenderer>,
75) -> Result<i32> {
76 let bus = EventBus::new();
78 let archives = Arc::new(ArchiveStore::new());
79 let cancel = CancellationToken::new();
80 let _ctrlc = super::signal::install_ctrlc(cancel.clone());
81 let docker = DockerClient::connect()
83 .map_err(|e| HmError::Docker(format!("daemon unreachable — is Docker running? ({e})")))?;
84 docker
85 .ping()
86 .await
87 .map_err(|e| HmError::Docker(format!("daemon ping failed: {e}")))?;
88 let run_id = Uuid::new_v4();
89
90 let archive_bytes = build_archive_bytes(&repo_root).context("build source archive")?;
92 let archive_id = archives.register(archive_bytes);
93
94 let run_ctx = RunContext {
95 docker: docker.clone(),
96 event_bus: bus.clone(),
97 archives: archives.clone(),
98 cancel: cancel.clone(),
99 };
100
101 let parallelism = parallelism.max(1);
102
103 let semaphore = Arc::new(tokio::sync::Semaphore::new(parallelism));
104
105 let sink_handle = super::output_subscriber::spawn(bus.clone(), renderer);
108
109 let dag = graph.dag();
110 let chain_info = compute_chain_info(dag);
111
112 let order = toposort(dag.graph(), None)
113 .map_err(|c| anyhow::anyhow!("pipeline graph has a cycle at {:?}", c.node_id()))?;
114
115 let started_at = chrono::Utc::now();
116 bus.emit(BuildEvent::BuildStart {
117 run_id,
118 plan: PlanSummary {
119 step_count: graph.node_count(),
120 chain_count: chain_info.chain_count,
121 default_runner: runner_registry
122 .default_runner_name()
123 .unwrap_or("docker")
124 .into(),
125 },
126 started_at,
127 });
128
129 let started_total = Instant::now();
130
131 let mut done: HashMap<NodeIndex, StepFuture> = HashMap::new();
132
133 for &n in &order {
134 let preds: Vec<(EdgeKind, StepFuture)> = dag
135 .parents(n)
136 .iter(dag)
137 .map(|(e, p)| (*dag.edge_weight(e).expect("edge in DAG"), done[&p].clone()))
138 .collect();
139
140 let transition = dag[n].clone();
141 let chain_id = chain_info.node_chain_id[&n];
142 let chain_pos = chain_info.node_chain_pos[&n];
143 let parent_key: Option<String> = dag
144 .parents(n)
145 .iter(dag)
146 .find(|(e, _)| dag.edge_weight(*e).copied() == Some(EdgeKind::BuildsIn))
147 .map(|(_, p)| dag[p].step.key.clone());
148 let sem = semaphore.clone();
149 let reg = runner_registry.clone();
150 let bus = bus.clone();
151 let cancel = cancel.clone();
152 let run_ctx = run_ctx.clone();
153
154 let fut: StepFuture = async move {
155 let pred_outcomes: Vec<StepOutcome> =
157 join_all(preds.iter().map(|(_, f)| f.clone())).await;
158
159 if cancel.is_cancelled() || pred_outcomes.iter().any(|o| o.exit_code != 0) {
161 return StepOutcome {
162 exit_code: 0,
163 snapshot: None,
164 };
165 }
166
167 let _permit = sem
169 .acquire_owned()
170 .await
171 .expect("semaphore closed unexpectedly");
172
173 let parent_snapshot = preds
175 .iter()
176 .zip(&pred_outcomes)
177 .find(|((ek, _), _)| *ek == EdgeKind::BuildsIn)
178 .and_then(|(_, outcome)| outcome.snapshot.clone());
179
180 match execute_step(
181 n,
182 transition,
183 parent_snapshot,
184 chain_id,
185 chain_pos,
186 parent_key,
187 archive_id,
188 run_id,
189 run_ctx,
190 reg,
191 bus,
192 cancel,
193 )
194 .await
195 {
196 Ok(outcome) => outcome,
197 Err(e) => {
198 tracing::error!(%e, "step execution failed");
199 StepOutcome {
200 exit_code: 1,
201 snapshot: None,
202 }
203 }
204 }
205 }
206 .boxed()
207 .shared();
208
209 tokio::spawn(fut.clone());
210 done.insert(n, fut);
211 }
212
213 let outcomes: Vec<StepOutcome> = join_all(done.into_values()).await;
214 let overall = if outcomes.iter().any(|o| o.exit_code != 0) {
215 crate::error::EXIT_BUILD_FAILED
216 } else {
217 0
218 };
219
220 let dur = started_total.elapsed().as_millis() as u64;
221
222 let ephemeral_tags: Vec<&str> = outcomes
224 .iter()
225 .filter_map(|o| o.snapshot.as_ref())
226 .filter(|s| s.0.starts_with("harmont-local-ephemeral/"))
227 .map(|s| s.0.as_str())
228 .collect();
229 for tag in ephemeral_tags {
230 if let Err(e) = docker.remove_image(tag).await {
231 tracing::warn!(image = %tag, %e, "failed to remove ephemeral image");
232 }
233 }
234
235 bus.emit(BuildEvent::BuildEnd {
236 exit_code: overall,
237 duration_ms: dur,
238 });
239
240 let _ = tokio::time::timeout(std::time::Duration::from_secs(2), sink_handle).await;
241
242 Ok(overall)
243}
244
245#[allow(clippy::too_many_arguments)]
254async fn execute_step(
255 _node_idx: NodeIndex,
256 transition: Transition,
257 parent_snapshot: Option<SnapshotRef>,
258 chain_id: usize,
259 chain_pos: usize,
260 parent_key: Option<String>,
261 archive_id: ArchiveId,
262 run_id: Uuid,
263 run_ctx: RunContext,
264 runner_registry: Arc<RunnerRegistry>,
265 bus: Arc<EventBus>,
266 cancel: CancellationToken,
267) -> Result<StepOutcome> {
268 let step_wire = transition.step;
269 let step_key = step_wire.key.clone();
270 let display_name = step_wire.label.clone().unwrap_or_else(|| {
271 let cmd = step_wire.cmd.trim();
272 if cmd.len() <= 40 {
273 cmd.to_owned()
274 } else {
275 format!("{}…", &cmd[..39])
276 }
277 });
278 let env_map = transition.env;
279 let step_id = Uuid::new_v4();
280
281 bus.emit(BuildEvent::StepQueued {
282 step_id,
283 key: step_key.clone(),
284 chain_idx: chain_pos,
285 parent_key: parent_key.clone(),
286 display_name: display_name.clone(),
287 });
288
289 let cache_tag = cache::stable_cache_tag(&step_wire);
291 if let Some(ref dtag) = cache_tag
292 && run_ctx.docker.image_exists(dtag).await.unwrap_or(false)
293 {
294 bus.emit(BuildEvent::StepCacheHit {
295 step_id,
296 key: step_wire
297 .cache
298 .as_ref()
299 .and_then(|c| c.key.clone())
300 .unwrap_or_default(),
301 tag: dtag.clone(),
302 });
303 return Ok(StepOutcome {
304 exit_code: 0,
305 snapshot: Some(SnapshotRef::from(dtag.clone())),
306 });
307 }
308
309 let cache_lookup = cache_tag
310 .as_ref()
311 .map_or(CacheDecision::MissNoCommit, |tag| {
312 CacheDecision::MissBuildAs {
313 tag: SnapshotRef::from(tag.clone()),
314 }
315 });
316
317 let input = ExecutorInput {
318 step: step_wire,
319 workspace_archive_id: archive_id,
320 env: env_map,
321 workdir: "/workspace".to_string(),
322 run_id,
323 step_id,
324 cache_lookup,
325 parent_snapshot,
326 };
327
328 let runner_name = input
331 .step
332 .runner
333 .as_deref()
334 .or_else(|| runner_registry.default_runner_name())
335 .unwrap_or("docker")
336 .to_owned();
337
338 let started = Instant::now();
339 bus.emit(BuildEvent::StepStart {
340 step_id,
341 runner: runner_name.clone(),
342 image: input.step.image.clone(),
343 });
344
345 let runner = runner_registry
346 .resolve(input.step.runner.as_deref())
347 .ok_or_else(|| HmError::UnknownRunner {
348 step_key: input.step.key.clone(),
349 runner: runner_name.clone(),
350 available: runner_registry
351 .runner_names()
352 .into_iter()
353 .map(str::to_owned)
354 .collect(),
355 })?;
356
357 let result: Result<StepResult> = runner.execute(&run_ctx, input).await;
358
359 let dur_ms = started.elapsed().as_millis() as u64;
360 match result {
361 Ok(sr) => {
362 bus.emit(BuildEvent::StepEnd {
363 step_id,
364 exit_code: sr.exit_code,
365 duration_ms: dur_ms,
366 snapshot: sr.committed_snapshot.clone(),
367 });
368 if sr.exit_code != 0 {
369 bus.emit(BuildEvent::ChainFailed {
370 chain_idx: chain_id,
371 failed_step_id: step_id,
372 failed_step_key: step_key.clone(),
373 exit_code: sr.exit_code,
374 message: format!("step '{}' exited with code {}", step_key, sr.exit_code),
375 ts: chrono::Utc::now(),
376 });
377 cancel.cancel();
378 } else if let Some(ref snapshot) = sr.committed_snapshot {
379 if let Some(ref dtag) = cache_tag
380 && snapshot.0 != *dtag
381 && let Err(e) = run_ctx.docker.tag_image(&snapshot.0, dtag).await
382 {
383 tracing::warn!(%e, "failed to re-tag Docker image for cache");
384 }
385 cache::evict_stale_docker_tags(&run_ctx.docker, &step_key, cache_tag.as_deref())
386 .await;
387 }
388 Ok(StepOutcome {
389 exit_code: sr.exit_code,
390 snapshot: sr.committed_snapshot,
391 })
392 }
393 Err(e) => {
394 bus.emit(BuildEvent::StepEnd {
395 step_id,
396 exit_code: 1,
397 duration_ms: dur_ms,
398 snapshot: None,
399 });
400 Err(e)
401 }
402 }
403}
404
405struct ChainInfo {
408 chain_count: usize,
409 node_chain_id: HashMap<NodeIndex, usize>,
410 node_chain_pos: HashMap<NodeIndex, usize>,
411}
412
413fn compute_chain_info(dag: &Dag<Transition, EdgeKind>) -> ChainInfo {
419 let mut node_chain_id: HashMap<NodeIndex, usize> = HashMap::new();
420 let mut node_chain_pos: HashMap<NodeIndex, usize> = HashMap::new();
421 let mut chain_count: usize = 0;
422
423 let mut indices: Vec<NodeIndex> = dag.graph().node_indices().collect();
425 indices.sort();
426
427 for idx in indices {
428 if node_chain_id.contains_key(&idx) {
429 continue;
430 }
431
432 let chain_id = chain_count;
434 chain_count += 1;
435
436 let mut cur = idx;
437 let mut pos: usize = 0;
438 loop {
439 node_chain_id.insert(cur, chain_id);
440 node_chain_pos.insert(cur, pos);
441 pos += 1;
442
443 let builds_in_children: Vec<NodeIndex> = dag
445 .children(cur)
446 .iter(dag)
447 .filter(|(e, _)| dag.edge_weight(*e).copied() == Some(EdgeKind::BuildsIn))
448 .map(|(_, child)| child)
449 .collect();
450
451 if builds_in_children.len() != 1 {
453 break;
454 }
455 let child = builds_in_children[0];
456
457 if node_chain_id.contains_key(&child) {
459 break;
460 }
461
462 let parent_count = dag.parents(child).iter(dag).count();
464 if parent_count != 1 {
465 break;
466 }
467
468 cur = child;
469 }
470 }
471
472 ChainInfo {
473 chain_count,
474 node_chain_id,
475 node_chain_pos,
476 }
477}