hm_exec/local/runner/
vm.rs1use std::future::Future;
9use std::pin::Pin;
10use std::sync::Arc;
11
12use anyhow::{Context, Result};
13use hm_plugin_protocol::{
14 BuildEvent, CacheDecision, ExecutorInput, SnapshotRef, StdStream, StepResult,
15};
16use hm_vm::types::OutputSink;
17use hm_vm::{Action, CachingPolicy, HmVm, ImageSource, SnapshotId};
18use uuid::Uuid;
19
20use super::{StepContext, StepRunner};
21use crate::local::events::EventBus;
22
23#[derive(Debug)]
26pub struct VmRunner {
27 vm: Arc<HmVm>,
28}
29
30impl VmRunner {
31 #[must_use]
33 pub const fn new(vm: Arc<HmVm>) -> Self {
34 Self { vm }
35 }
36}
37
38impl StepRunner for VmRunner {
39 fn name(&self) -> &'static str {
40 "vm"
41 }
42
43 fn execute(
44 &self,
45 ctx: &StepContext,
46 input: ExecutorInput,
47 ) -> Pin<Box<dyn Future<Output = Result<StepResult>> + Send + '_>> {
48 let ctx = ctx.clone();
49 let vm = Arc::clone(&self.vm);
50 Box::pin(async move { run_step_vm(&vm, &ctx, input).await })
51 }
52
53 fn reap_snapshots<'a>(
54 &'a self,
55 snapshots: Vec<SnapshotRef>,
56 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
57 let vm = Arc::clone(&self.vm);
58 Box::pin(async move {
59 for snap in snapshots {
60 let id = SnapshotId::new(snap.0);
61 if let Err(e) = vm.remove_snapshot(&id).await {
62 tracing::warn!(snapshot = %id, error = %e, "failed to reap ephemeral snapshot");
63 }
64 }
65 })
66 }
67}
68
69#[tracing::instrument(skip(vm, ctx), fields(step_key = %input.step.key))]
70async fn run_step_vm(vm: &HmVm, ctx: &StepContext, input: ExecutorInput) -> Result<StepResult> {
71 let policy = match &input.cache_lookup {
72 CacheDecision::Hit { tag } | CacheDecision::MissBuildAs { tag } => {
73 CachingPolicy::Cache { key: tag.0.clone() }
74 }
75 CacheDecision::MissNoCommit => CachingPolicy::None,
76 };
77
78 let source = if let Some(ref snap) = input.parent_snapshot {
79 ImageSource::Snapshot(SnapshotId::new(snap.0.clone()))
80 } else {
81 ImageSource::Image(
85 input
86 .step
87 .image
88 .clone()
89 .unwrap_or_else(|| "ubuntu:24.04".to_string()),
90 )
91 };
92
93 let (inject, _temp_guard) = {
104 let archive_bytes = ctx
105 .archives
106 .get_bytes(input.workspace_archive_id)
107 .ok_or_else(|| anyhow::anyhow!("source archive not found"))?;
108 let dir =
109 extract_archive_to_tempdir(&archive_bytes).context("extracting workspace archive")?;
110 let path = dir.path().to_path_buf();
111 (Some(path), Some(dir))
112 };
113
114 let mut env: Vec<(String, String)> = vec![
116 ("HOME".into(), "/root".into()),
117 (
118 "PATH".into(),
119 "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin".into(),
120 ),
121 ];
122 env.extend(input.env);
123
124 let action = Action {
125 source,
126 cmd: input.step.cmd.clone(),
127 env,
128 working_dir: input.workdir.clone(),
129 timeout: None,
130 inject,
131 };
132
133 let sink = EventBusSink {
134 step_id: input.step_id,
135 bus: Arc::clone(&ctx.event_bus),
136 };
137
138 let result = tokio::select! {
139 r = vm.execute(action, policy, &sink) => r,
140 () = ctx.cancel.cancelled() => {
141 anyhow::bail!("step cancelled (build timeout or sibling failure)")
142 }
143 }
144 .context("vm execute failed")?;
145
146 if result.cached {
147 ctx.event_bus.emit(BuildEvent::StepCacheHit {
148 step_id: input.step_id,
149 key: input
150 .step
151 .cache
152 .as_ref()
153 .and_then(|c| c.key.clone())
154 .unwrap_or_default(),
155 tag: result
156 .snapshot
157 .as_ref()
158 .map_or_else(String::new, ToString::to_string),
159 });
160 }
161
162 Ok(StepResult {
163 exit_code: result.exit_code,
164 committed_snapshot: result.snapshot.map(|s| SnapshotRef(s.to_string())),
165 artifacts: vec![],
166 })
167}
168
169fn extract_archive_to_tempdir(archive_bytes: &[u8]) -> Result<tempfile::TempDir> {
171 let temp_dir = tempfile::tempdir().context("creating temp directory")?;
172 let decoder = flate2::read::GzDecoder::new(archive_bytes);
173 let mut archive = tar::Archive::new(decoder);
174 archive
175 .unpack(temp_dir.path())
176 .context("unpacking archive")?;
177 Ok(temp_dir)
178}
179
180struct EventBusSink {
183 step_id: Uuid,
184 bus: Arc<EventBus>,
185}
186
187impl std::fmt::Debug for EventBusSink {
188 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
189 f.debug_struct("EventBusSink")
190 .field("step_id", &self.step_id)
191 .finish_non_exhaustive()
192 }
193}
194
195impl OutputSink for EventBusSink {
196 fn on_stdout(&self, line: &str) {
197 self.bus.emit(BuildEvent::StepLog {
198 step_id: self.step_id,
199 stream: StdStream::Stdout,
200 line: line.to_owned(),
201 ts: chrono::Utc::now(),
202 });
203 }
204
205 fn on_stderr(&self, line: &str) {
206 self.bus.emit(BuildEvent::StepLog {
207 step_id: self.step_id,
208 stream: StdStream::Stderr,
209 line: line.to_owned(),
210 ts: chrono::Utc::now(),
211 });
212 }
213}