Skip to main content

hm_exec/local/runner/
vm.rs

1//! VM-based step runner.
2//!
3//! Each step runs inside a lightweight VM managed by [`HmVm`]. The
4//! source archive is extracted to a host-side temp directory and
5//! injected into the VM before the step command runs. System-level
6//! state propagates via VM snapshots.
7
8use std::future::Future;
9use std::pin::Pin;
10use std::sync::Arc;
11
12use anyhow::{Context, Result};
13use hm_plugin_protocol::{
14    BuildEvent, CacheDecision, ExecutorInput, SnapshotRef, StdStream, StepResult,
15};
16use hm_vm::types::OutputSink;
17use hm_vm::{Action, CachingPolicy, HmVm, ImageSource, SnapshotId};
18use uuid::Uuid;
19
20use super::{StepContext, StepRunner};
21use crate::local::events::EventBus;
22
23/// Step runner that executes pipeline steps inside lightweight VMs
24/// via the [`HmVm`] orchestrator.
25#[derive(Debug)]
26pub struct VmRunner {
27    vm: Arc<HmVm>,
28}
29
30impl VmRunner {
31    /// Create a new `VmRunner` backed by the given VM orchestrator.
32    #[must_use]
33    pub const fn new(vm: Arc<HmVm>) -> Self {
34        Self { vm }
35    }
36}
37
38impl StepRunner for VmRunner {
39    fn name(&self) -> &'static str {
40        "vm"
41    }
42
43    fn execute(
44        &self,
45        ctx: &StepContext,
46        input: ExecutorInput,
47    ) -> Pin<Box<dyn Future<Output = Result<StepResult>> + Send + '_>> {
48        let ctx = ctx.clone();
49        let vm = Arc::clone(&self.vm);
50        Box::pin(async move { run_step_vm(&vm, &ctx, input).await })
51    }
52
53    fn reap_snapshots<'a>(
54        &'a self,
55        snapshots: Vec<SnapshotRef>,
56    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
57        let vm = Arc::clone(&self.vm);
58        Box::pin(async move {
59            for snap in snapshots {
60                let id = SnapshotId::new(snap.0);
61                if let Err(e) = vm.remove_snapshot(&id).await {
62                    tracing::warn!(snapshot = %id, error = %e, "failed to reap ephemeral snapshot");
63                }
64            }
65        })
66    }
67}
68
69#[tracing::instrument(skip(vm, ctx), fields(step_key = %input.step.key))]
70async fn run_step_vm(vm: &HmVm, ctx: &StepContext, input: ExecutorInput) -> Result<StepResult> {
71    let policy = match &input.cache_lookup {
72        CacheDecision::Hit { tag } | CacheDecision::MissBuildAs { tag } => {
73            CachingPolicy::Cache { key: tag.0.clone() }
74        }
75        CacheDecision::MissNoCommit => CachingPolicy::None,
76    };
77
78    let source = if let Some(ref snap) = input.parent_snapshot {
79        ImageSource::Snapshot(SnapshotId::new(snap.0.clone()))
80    } else {
81        ImageSource::Image(
82            input
83                .step
84                .image
85                .clone()
86                .unwrap_or_else(|| "alpine:latest".to_string()),
87        )
88    };
89
90    // Inject the current workspace on every executing step, overlaying it
91    // onto the system state inherited from the parent snapshot (apt packages,
92    // installed runtimes, `node_modules`, …). Injecting only at the chain root
93    // is wrong: root steps such as `apt_base` are `CacheForever`, so their
94    // snapshots freeze the source tree captured at first build and every COW
95    // descendant inherits that stale tree — source edits never reach leaf
96    // steps. A true cache hit short-circuits inside `HmVm::execute` before
97    // inject runs, so this overlay only happens when a step actually executes;
98    // the overlay (Docker PUT-archive) adds/overwrites files without deleting
99    // the inherited system state.
100    let (inject, _temp_guard) = {
101        let archive_bytes = ctx
102            .archives
103            .get_bytes(input.workspace_archive_id)
104            .ok_or_else(|| anyhow::anyhow!("source archive not found"))?;
105        let dir =
106            extract_archive_to_tempdir(&archive_bytes).context("extracting workspace archive")?;
107        let path = dir.path().to_path_buf();
108        (Some(path), Some(dir))
109    };
110
111    // Baseline env for shell operation inside VMs.
112    let mut env: Vec<(String, String)> = vec![
113        ("HOME".into(), "/root".into()),
114        (
115            "PATH".into(),
116            "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin".into(),
117        ),
118    ];
119    env.extend(input.env);
120
121    let action = Action {
122        source,
123        cmd: input.step.cmd.clone(),
124        env,
125        working_dir: input.workdir.clone(),
126        timeout: None,
127        inject,
128    };
129
130    let sink = EventBusSink {
131        step_id: input.step_id,
132        bus: Arc::clone(&ctx.event_bus),
133    };
134
135    let result = tokio::select! {
136        r = vm.execute(action, policy, &sink) => r,
137        () = ctx.cancel.cancelled() => {
138            anyhow::bail!("step cancelled (build timeout or sibling failure)")
139        }
140    }
141    .context("vm execute failed")?;
142
143    if result.cached {
144        ctx.event_bus.emit(BuildEvent::StepCacheHit {
145            step_id: input.step_id,
146            key: input
147                .step
148                .cache
149                .as_ref()
150                .and_then(|c| c.key.clone())
151                .unwrap_or_default(),
152            tag: result
153                .snapshot
154                .as_ref()
155                .map_or_else(String::new, ToString::to_string),
156        });
157    }
158
159    Ok(StepResult {
160        exit_code: result.exit_code,
161        committed_snapshot: result.snapshot.map(|s| SnapshotRef(s.to_string())),
162        artifacts: vec![],
163    })
164}
165
166/// Extracts a gzipped tar archive into a temporary directory.
167fn extract_archive_to_tempdir(archive_bytes: &[u8]) -> Result<tempfile::TempDir> {
168    let temp_dir = tempfile::tempdir().context("creating temp directory")?;
169    let decoder = flate2::read::GzDecoder::new(archive_bytes);
170    let mut archive = tar::Archive::new(decoder);
171    archive
172        .unpack(temp_dir.path())
173        .context("unpacking archive")?;
174    Ok(temp_dir)
175}
176
177/// [`OutputSink`] implementation that emits [`BuildEvent::StepLog`]
178/// events on the [`EventBus`].
179struct EventBusSink {
180    step_id: Uuid,
181    bus: Arc<EventBus>,
182}
183
184impl std::fmt::Debug for EventBusSink {
185    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
186        f.debug_struct("EventBusSink")
187            .field("step_id", &self.step_id)
188            .finish_non_exhaustive()
189    }
190}
191
192impl OutputSink for EventBusSink {
193    fn on_stdout(&self, line: &str) {
194        self.bus.emit(BuildEvent::StepLog {
195            step_id: self.step_id,
196            stream: StdStream::Stdout,
197            line: line.to_owned(),
198            ts: chrono::Utc::now(),
199        });
200    }
201
202    fn on_stderr(&self, line: &str) {
203        self.bus.emit(BuildEvent::StepLog {
204            step_id: self.step_id,
205            stream: StdStream::Stderr,
206            line: line.to_owned(),
207            ts: chrono::Utc::now(),
208        });
209    }
210}