Skip to main content

hm_exec/local/runner/
vm.rs

1//! VM-based step runner.
2//!
3//! Each step runs inside a lightweight VM managed by [`HmVm`]. The
4//! source archive is extracted to a host-side temp directory and
5//! injected into the VM before the step command runs. System-level
6//! state propagates via VM snapshots.
7
8use std::future::Future;
9use std::pin::Pin;
10use std::sync::Arc;
11
12use anyhow::{Context, Result};
13use hm_plugin_protocol::{
14    BuildEvent, CacheDecision, ExecutorInput, SnapshotRef, StdStream, StepResult,
15};
16use hm_vm::types::OutputSink;
17use hm_vm::{Action, CachingPolicy, HmVm, ImageSource, SnapshotId};
18use uuid::Uuid;
19
20use super::{StepContext, StepRunner};
21use crate::local::events::EventBus;
22
23/// Step runner that executes pipeline steps inside lightweight VMs
24/// via the [`HmVm`] orchestrator.
25#[derive(Debug)]
26pub struct VmRunner {
27    vm: Arc<HmVm>,
28}
29
30impl VmRunner {
31    /// Create a new `VmRunner` backed by the given VM orchestrator.
32    #[must_use]
33    pub const fn new(vm: Arc<HmVm>) -> Self {
34        Self { vm }
35    }
36}
37
38impl StepRunner for VmRunner {
39    fn name(&self) -> &'static str {
40        "vm"
41    }
42
43    fn execute(
44        &self,
45        ctx: &StepContext,
46        input: ExecutorInput,
47    ) -> Pin<Box<dyn Future<Output = Result<StepResult>> + Send + '_>> {
48        let ctx = ctx.clone();
49        let vm = Arc::clone(&self.vm);
50        Box::pin(async move { run_step_vm(&vm, &ctx, input).await })
51    }
52
53    fn reap_snapshots<'a>(
54        &'a self,
55        snapshots: Vec<SnapshotRef>,
56    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
57        let vm = Arc::clone(&self.vm);
58        Box::pin(async move {
59            for snap in snapshots {
60                let id = SnapshotId::new(snap.0);
61                if let Err(e) = vm.remove_snapshot(&id).await {
62                    tracing::warn!(snapshot = %id, error = %e, "failed to reap ephemeral snapshot");
63                }
64            }
65        })
66    }
67}
68
69#[tracing::instrument(skip(vm, ctx), fields(step_key = %input.step.key))]
70async fn run_step_vm(vm: &HmVm, ctx: &StepContext, input: ExecutorInput) -> Result<StepResult> {
71    let policy = match &input.cache_lookup {
72        CacheDecision::Hit { tag } | CacheDecision::MissBuildAs { tag } => {
73            CachingPolicy::Cache { key: tag.0.clone() }
74        }
75        CacheDecision::MissNoCommit => CachingPolicy::None,
76    };
77
78    let source = if let Some(ref snap) = input.parent_snapshot {
79        ImageSource::Snapshot(SnapshotId::new(snap.0.clone()))
80    } else {
81        // Imageless root step: default to an apt-capable base. The SDK's
82        // toolchains all assume `apt-get`, so `ubuntu:24.04` is the
83        // across-the-board default (alpine has no apt-get).
84        ImageSource::Image(
85            input
86                .step
87                .image
88                .clone()
89                .unwrap_or_else(|| "ubuntu:24.04".to_string()),
90        )
91    };
92
93    // Inject the current workspace on every executing step, overlaying it
94    // onto the system state inherited from the parent snapshot (apt packages,
95    // installed runtimes, `node_modules`, …). Injecting only at the chain root
96    // is wrong: root steps such as `apt_base` are `CacheForever`, so their
97    // snapshots freeze the source tree captured at first build and every COW
98    // descendant inherits that stale tree — source edits never reach leaf
99    // steps. A true cache hit short-circuits inside `HmVm::execute` before
100    // inject runs, so this overlay only happens when a step actually executes;
101    // the overlay (Docker PUT-archive) adds/overwrites files without deleting
102    // the inherited system state.
103    let (inject, _temp_guard) = {
104        let archive_bytes = ctx
105            .archives
106            .get_bytes(input.workspace_archive_id)
107            .ok_or_else(|| anyhow::anyhow!("source archive not found"))?;
108        let dir =
109            extract_archive_to_tempdir(&archive_bytes).context("extracting workspace archive")?;
110        let path = dir.path().to_path_buf();
111        (Some(path), Some(dir))
112    };
113
114    // Baseline env for shell operation inside VMs.
115    let mut env: Vec<(String, String)> = vec![
116        ("HOME".into(), "/root".into()),
117        (
118            "PATH".into(),
119            "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin".into(),
120        ),
121    ];
122    env.extend(input.env);
123
124    let action = Action {
125        source,
126        cmd: input.step.cmd.clone(),
127        env,
128        working_dir: input.workdir.clone(),
129        timeout: None,
130        inject,
131    };
132
133    let sink = EventBusSink {
134        step_id: input.step_id,
135        bus: Arc::clone(&ctx.event_bus),
136    };
137
138    let result = tokio::select! {
139        r = vm.execute(action, policy, &sink) => r,
140        () = ctx.cancel.cancelled() => {
141            anyhow::bail!("step cancelled (build timeout or sibling failure)")
142        }
143    }
144    .context("vm execute failed")?;
145
146    if result.cached {
147        ctx.event_bus.emit(BuildEvent::StepCacheHit {
148            step_id: input.step_id,
149            key: input
150                .step
151                .cache
152                .as_ref()
153                .and_then(|c| c.key.clone())
154                .unwrap_or_default(),
155            tag: result
156                .snapshot
157                .as_ref()
158                .map_or_else(String::new, ToString::to_string),
159        });
160    }
161
162    Ok(StepResult {
163        exit_code: result.exit_code,
164        committed_snapshot: result.snapshot.map(|s| SnapshotRef(s.to_string())),
165        artifacts: vec![],
166    })
167}
168
169/// Extracts a gzipped tar archive into a temporary directory.
170fn extract_archive_to_tempdir(archive_bytes: &[u8]) -> Result<tempfile::TempDir> {
171    let temp_dir = tempfile::tempdir().context("creating temp directory")?;
172    let decoder = flate2::read::GzDecoder::new(archive_bytes);
173    let mut archive = tar::Archive::new(decoder);
174    archive
175        .unpack(temp_dir.path())
176        .context("unpacking archive")?;
177    Ok(temp_dir)
178}
179
180/// [`OutputSink`] implementation that emits [`BuildEvent::StepLog`]
181/// events on the [`EventBus`].
182struct EventBusSink {
183    step_id: Uuid,
184    bus: Arc<EventBus>,
185}
186
187impl std::fmt::Debug for EventBusSink {
188    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
189        f.debug_struct("EventBusSink")
190            .field("step_id", &self.step_id)
191            .finish_non_exhaustive()
192    }
193}
194
195impl OutputSink for EventBusSink {
196    fn on_stdout(&self, line: &str) {
197        self.bus.emit(BuildEvent::StepLog {
198            step_id: self.step_id,
199            stream: StdStream::Stdout,
200            line: line.to_owned(),
201            ts: chrono::Utc::now(),
202        });
203    }
204
205    fn on_stderr(&self, line: &str) {
206        self.bus.emit(BuildEvent::StepLog {
207            step_id: self.step_id,
208            stream: StdStream::Stderr,
209            line: line.to_owned(),
210            ts: chrono::Utc::now(),
211        });
212    }
213}