harmont_cli/orchestrator/
scheduler.rs1#![allow(
16 clippy::cast_possible_truncation,
17 clippy::expect_used,
18 clippy::too_many_lines,
19 clippy::missing_panics_doc,
20 clippy::significant_drop_tightening
25)]
26
27use std::collections::{HashMap, HashSet};
28use std::path::PathBuf;
29use std::sync::Arc;
30use std::time::Instant;
31
32use anyhow::{Context, Result};
33use hm_plugin_protocol::{
34 ArchiveId, BuildEvent, ExecutorInput, PlanSummary, SnapshotRef, StepResult,
35};
36use tokio::sync::Mutex;
37use uuid::Uuid;
38
39use crate::error::HmError;
40use crate::orchestrator::docker_client::DockerClient;
41use crate::orchestrator::graph::Graph;
42use crate::orchestrator::source::build_archive_bytes;
43use crate::plugin::{PluginRegistry, RegistryConfig};
44
45use super::archive::ArchiveStore;
46use super::cache;
47use super::cancel::CancellationToken;
48use super::events::EventBus;
49use super::state::{self, OrchestratorState};
50
51pub async fn run(
61 pipeline: hm_plugin_protocol::Pipeline,
62 repo_root: PathBuf,
63 parallelism: usize,
64 format_name: String,
65) -> Result<i32> {
66 let graph = Graph::build(&pipeline).context("build graph")?;
68 let chains = graph.chains();
69 let chain_deps = graph.chain_deps(&chains);
70
71 let bus = EventBus::new();
73 let archives = ArchiveStore::new();
74 let cancel = CancellationToken::new();
75 let _ctrlc = crate::plugin::signal::install_ctrlc(cancel.clone());
76 let docker = DockerClient::connect()
78 .map_err(|e| HmError::Docker(format!("daemon unreachable — is Docker running? ({e})")))?;
79 docker
80 .ping()
81 .await
82 .map_err(|e| HmError::Docker(format!("daemon ping failed: {e}")))?;
83 let run_id = Uuid::new_v4();
84
85 let archive_bytes = build_archive_bytes(&repo_root).context("build source archive")?;
87 let archive_id = archives.register(archive_bytes);
88
89 let state_arc = Arc::new(OrchestratorState {
91 event_bus: bus.clone(),
92 archives,
93 cancel: cancel.clone(),
94 docker: docker.clone(),
95 run_id,
96 });
97 state::install(state_arc.clone());
98
99 let parallelism = parallelism.max(1);
100
101 let mut pool_sizes: std::collections::BTreeMap<String, usize> =
106 std::collections::BTreeMap::new();
107 pool_sizes.insert("docker".to_string(), parallelism);
108 let registry = Arc::new(Mutex::new(
109 PluginRegistry::load(RegistryConfig {
110 auto_discover: true,
111 extra_paths: vec![],
112 embedded: vec![
113 (
114 "harmont-docker",
115 crate::plugin::embedded::DOCKER_PLUGIN_WASM,
116 ),
117 (
118 "harmont-output-human",
119 crate::plugin::embedded::OUTPUT_HUMAN_PLUGIN_WASM,
120 ),
121 (
122 "harmont-output-json",
123 crate::plugin::embedded::OUTPUT_JSON_PLUGIN_WASM,
124 ),
125 ],
126 pool_sizes,
127 })
128 .context("load plugin registry")?,
129 ));
130
131 let bad_format: Option<Vec<String>> = {
137 let reg = registry.lock().await;
138 if reg.output_formatter_index.contains_key(&format_name) {
139 None
140 } else {
141 let mut names: Vec<String> = reg.output_formatter_index.keys().cloned().collect();
142 names.sort();
143 Some(names)
144 }
145 };
146 if let Some(available) = bad_format {
147 anyhow::bail!(
148 "unknown --format '{format_name}'; available: {}",
149 available.join(", ")
150 );
151 }
152
153 let semaphore = Arc::new(tokio::sync::Semaphore::new(parallelism));
154
155 let node_image: Arc<Mutex<HashMap<usize, SnapshotRef>>> = Arc::new(Mutex::new(HashMap::new()));
160
161 let sink_handle =
164 super::output_subscriber::spawn(bus.clone(), registry.clone(), format_name.clone());
165
166 let started_at = chrono::Utc::now();
168 let plan_summary = PlanSummary {
169 step_count: graph.nodes.len(),
170 chain_count: chains.len(),
171 default_runner: "docker".into(),
172 };
173 bus.emit(BuildEvent::BuildStart {
174 run_id,
175 plan: plan_summary,
176 started_at,
177 });
178
179 let started_total = Instant::now();
182 let mut overall = 0i32;
183 let mut completed: HashSet<usize> = HashSet::new();
184 let mut pending: Vec<usize> = (0..chains.len()).collect();
185 let mut in_flight: tokio::task::JoinSet<(usize, Result<i32>)> = tokio::task::JoinSet::new();
186
187 loop {
188 let mut still_pending = Vec::with_capacity(pending.len());
190 for ci in std::mem::take(&mut pending) {
191 let ready = chain_deps[ci].iter().all(|d| completed.contains(d));
192 if !ready {
193 still_pending.push(ci);
194 continue;
195 }
196 let semaphore = semaphore.clone();
197 let registry = registry.clone();
198 let graph = graph.clone();
199 let cancel = cancel.clone();
200 let chain_nodes = chains[ci].clone();
201 let bus = bus.clone();
202 let node_image = node_image.clone();
203 in_flight.spawn(async move {
204 let _permit = semaphore.acquire_owned().await.expect("semaphore");
205 if cancel.is_cancelled() {
206 return (ci, Ok(0));
207 }
208 let rc = run_chain(
209 ci,
210 &graph,
211 &chain_nodes,
212 archive_id,
213 run_id,
214 ®istry,
215 &bus,
216 &cancel,
217 &node_image,
218 )
219 .await;
220 (ci, rc)
221 });
222 }
223 pending = still_pending;
224
225 if in_flight.is_empty() {
226 break;
227 }
228
229 match in_flight.join_next().await {
230 Some(Ok((ci, Ok(0)))) => {
231 completed.insert(ci);
232 }
233 Some(Ok((ci, Ok(_rc)))) => {
234 overall = crate::error::EXIT_BUILD_FAILED;
235 cancel.cancel();
236 completed.insert(ci);
237 }
239 Some(Ok((_, Err(e)))) => {
240 cancel.cancel();
241 bus.emit(BuildEvent::BuildEnd {
242 exit_code: crate::error::EXIT_BUILD_FAILED,
243 duration_ms: started_total.elapsed().as_millis() as u64,
244 });
245 return Err(e);
246 }
247 Some(Err(je)) => {
248 cancel.cancel();
249 bus.emit(BuildEvent::BuildEnd {
250 exit_code: crate::error::EXIT_BUILD_FAILED,
251 duration_ms: started_total.elapsed().as_millis() as u64,
252 });
253 return Err(anyhow::anyhow!("chain task panicked: {je}"));
254 }
255 None => break,
256 }
257 }
258
259 let dur = started_total.elapsed().as_millis() as u64;
260 bus.emit(BuildEvent::BuildEnd {
261 exit_code: overall,
262 duration_ms: dur,
263 });
264
265 let _ = tokio::time::timeout(std::time::Duration::from_secs(2), sink_handle).await;
268
269 state::clear();
270 drop(state_arc);
271 Ok(overall)
272}
273
274#[allow(
284 clippy::too_many_arguments,
285 reason = "tightly-coupled per-run state — splitting into a struct would just rename the bag"
286)]
287async fn run_chain(
288 chain_idx: usize,
289 graph: &Graph,
290 chain_nodes: &[usize],
291 archive_id: ArchiveId,
292 run_id: Uuid,
293 registry: &Arc<Mutex<PluginRegistry>>,
294 bus: &Arc<EventBus>,
295 cancel: &CancellationToken,
296 node_image: &Arc<Mutex<HashMap<usize, SnapshotRef>>>,
297) -> Result<i32> {
298 let chain_root = chain_nodes[0];
303 let mut parent_snapshot: Option<SnapshotRef> = {
304 let g = node_image.lock().await;
305 graph.nodes[chain_root]
306 .builds_in
307 .and_then(|p| g.get(&p).cloned())
308 };
309
310 for (pos, &i) in chain_nodes.iter().enumerate() {
311 if cancel.is_cancelled() {
312 return Ok(0);
313 }
314 let step_wire = graph.nodes[i].step.clone();
315 let step_key = step_wire.key.clone();
318 let env_map: std::collections::BTreeMap<String, String> =
319 graph.nodes[i].env.clone().into_iter().collect();
320 let step_id = Uuid::new_v4();
321
322 bus.emit(BuildEvent::StepQueued {
323 step_id,
324 key: step_key.clone(),
325 chain_idx: pos,
326 });
327
328 let decision = {
330 let s = state::current().context("no orchestrator state")?;
331 cache::decide(&s.docker, &step_wire).await?
332 };
333 if let hm_plugin_protocol::CacheDecision::Hit { tag } = &decision {
334 bus.emit(BuildEvent::StepCacheHit {
335 step_id,
336 key: step_wire
337 .cache
338 .as_ref()
339 .and_then(|c| c.key.clone())
340 .unwrap_or_default(),
341 tag: tag.0.clone(),
342 });
343 }
344
345 let input = ExecutorInput {
346 step: step_wire,
347 workspace_archive_id: archive_id,
348 env: env_map,
349 workdir: "/workspace".to_string(),
350 run_id,
351 step_id,
352 cache_lookup: decision,
353 parent_snapshot: parent_snapshot.clone(),
354 };
355
356 let runner = if let Some(name) = input.step.runner.clone() {
364 name
365 } else {
366 let reg = registry.lock().await;
367 reg.default_runner_name()
368 .map_or_else(|| "docker".into(), str::to_string)
369 };
370 let started = Instant::now();
371 bus.emit(BuildEvent::StepStart {
372 step_id,
373 runner: runner.clone(),
374 image: input.step.image.clone(),
375 });
376
377 let plugin = {
383 let reg = registry.lock().await;
384 let idx = reg
385 .runner_index
386 .get(&runner)
387 .copied()
388 .or(reg.default_runner)
389 .ok_or_else(|| HmError::UnknownRunner {
390 step_key: input.step.key.clone(),
391 runner: runner.clone(),
392 available: reg.runner_index.keys().cloned().collect(),
393 })?;
394 reg.get(idx).context("plugin moved away under us")?
395 };
396 crate::plugin::host_fns::set_current_step_id(step_id);
397 let result: Result<StepResult> = plugin.call_capability("hm_executor_run", &input).await;
398 crate::plugin::host_fns::clear_current_step_id();
399
400 let dur_ms = started.elapsed().as_millis() as u64;
401 match result {
402 Ok(sr) => {
403 bus.emit(BuildEvent::StepEnd {
404 step_id,
405 exit_code: sr.exit_code,
406 duration_ms: dur_ms,
407 snapshot: sr.committed_snapshot.clone(),
408 });
409 if let Some(snap) = sr.committed_snapshot.clone() {
413 let mut g = node_image.lock().await;
414 g.insert(i, snap);
415 }
416 parent_snapshot = sr.committed_snapshot;
417 if sr.exit_code != 0 {
418 bus.emit(BuildEvent::ChainFailed {
419 chain_idx,
420 failed_step_id: step_id,
421 failed_step_key: step_key.clone(),
422 exit_code: sr.exit_code,
423 message: format!("step '{}' exited with code {}", step_key, sr.exit_code),
424 ts: chrono::Utc::now(),
425 });
426 return Ok(sr.exit_code);
427 }
428 }
429 Err(e) => {
430 bus.emit(BuildEvent::StepEnd {
431 step_id,
432 exit_code: 1,
433 duration_ms: dur_ms,
434 snapshot: None,
435 });
436 return Err(e);
437 }
438 }
439 }
440 Ok(0)
441}