hm_exec/local/runner/
vm.rs1use std::future::Future;
9use std::pin::Pin;
10use std::sync::Arc;
11
12use anyhow::{Context, Result};
13use hm_plugin_protocol::{
14 BuildEvent, CacheDecision, ExecutorInput, SnapshotRef, StdStream, StepResult,
15};
16use hm_vm::types::OutputSink;
17use hm_vm::{Action, CachingPolicy, HmVm, ImageSource, SnapshotId};
18use uuid::Uuid;
19
20use super::{StepContext, StepRunner};
21use crate::local::events::EventBus;
22
23#[derive(Debug)]
26pub struct VmRunner {
27 vm: Arc<HmVm>,
28}
29
30impl VmRunner {
31 #[must_use]
33 pub const fn new(vm: Arc<HmVm>) -> Self {
34 Self { vm }
35 }
36}
37
38impl StepRunner for VmRunner {
39 fn name(&self) -> &'static str {
40 "vm"
41 }
42
43 fn execute(
44 &self,
45 ctx: &StepContext,
46 input: ExecutorInput,
47 ) -> Pin<Box<dyn Future<Output = Result<StepResult>> + Send + '_>> {
48 let ctx = ctx.clone();
49 let vm = Arc::clone(&self.vm);
50 Box::pin(async move { run_step_vm(&vm, &ctx, input).await })
51 }
52
53 fn reap_snapshots<'a>(
54 &'a self,
55 snapshots: Vec<SnapshotRef>,
56 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
57 let vm = Arc::clone(&self.vm);
58 Box::pin(async move {
59 for snap in snapshots {
60 let id = SnapshotId::new(snap.0);
61 if let Err(e) = vm.remove_snapshot(&id).await {
62 tracing::warn!(snapshot = %id, error = %e, "failed to reap ephemeral snapshot");
63 }
64 }
65 })
66 }
67}
68
69#[tracing::instrument(skip(vm, ctx), fields(step_key = %input.step.key))]
70async fn run_step_vm(vm: &HmVm, ctx: &StepContext, input: ExecutorInput) -> Result<StepResult> {
71 let policy = match &input.cache_lookup {
72 CacheDecision::Hit { tag } | CacheDecision::MissBuildAs { tag } => {
73 CachingPolicy::Cache { key: tag.0.clone() }
74 }
75 CacheDecision::MissNoCommit => CachingPolicy::None,
76 };
77
78 let source = if let Some(ref snap) = input.parent_snapshot {
79 ImageSource::Snapshot(SnapshotId::new(snap.0.clone()))
80 } else {
81 ImageSource::Image(
82 input
83 .step
84 .image
85 .clone()
86 .unwrap_or_else(|| "alpine:latest".to_string()),
87 )
88 };
89
90 let (inject, _temp_guard) = {
101 let archive_bytes = ctx
102 .archives
103 .get_bytes(input.workspace_archive_id)
104 .ok_or_else(|| anyhow::anyhow!("source archive not found"))?;
105 let dir =
106 extract_archive_to_tempdir(&archive_bytes).context("extracting workspace archive")?;
107 let path = dir.path().to_path_buf();
108 (Some(path), Some(dir))
109 };
110
111 let mut env: Vec<(String, String)> = vec![
113 ("HOME".into(), "/root".into()),
114 (
115 "PATH".into(),
116 "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin".into(),
117 ),
118 ];
119 env.extend(input.env);
120
121 let action = Action {
122 source,
123 cmd: input.step.cmd.clone(),
124 env,
125 working_dir: input.workdir.clone(),
126 timeout: None,
127 inject,
128 };
129
130 let sink = EventBusSink {
131 step_id: input.step_id,
132 bus: Arc::clone(&ctx.event_bus),
133 };
134
135 let result = tokio::select! {
136 r = vm.execute(action, policy, &sink) => r,
137 () = ctx.cancel.cancelled() => {
138 anyhow::bail!("step cancelled (build timeout or sibling failure)")
139 }
140 }
141 .context("vm execute failed")?;
142
143 if result.cached {
144 ctx.event_bus.emit(BuildEvent::StepCacheHit {
145 step_id: input.step_id,
146 key: input
147 .step
148 .cache
149 .as_ref()
150 .and_then(|c| c.key.clone())
151 .unwrap_or_default(),
152 tag: result
153 .snapshot
154 .as_ref()
155 .map_or_else(String::new, ToString::to_string),
156 });
157 }
158
159 Ok(StepResult {
160 exit_code: result.exit_code,
161 committed_snapshot: result.snapshot.map(|s| SnapshotRef(s.to_string())),
162 artifacts: vec![],
163 })
164}
165
166fn extract_archive_to_tempdir(archive_bytes: &[u8]) -> Result<tempfile::TempDir> {
168 let temp_dir = tempfile::tempdir().context("creating temp directory")?;
169 let decoder = flate2::read::GzDecoder::new(archive_bytes);
170 let mut archive = tar::Archive::new(decoder);
171 archive
172 .unpack(temp_dir.path())
173 .context("unpacking archive")?;
174 Ok(temp_dir)
175}
176
177struct EventBusSink {
180 step_id: Uuid,
181 bus: Arc<EventBus>,
182}
183
184impl std::fmt::Debug for EventBusSink {
185 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
186 f.debug_struct("EventBusSink")
187 .field("step_id", &self.step_id)
188 .finish_non_exhaustive()
189 }
190}
191
192impl OutputSink for EventBusSink {
193 fn on_stdout(&self, line: &str) {
194 self.bus.emit(BuildEvent::StepLog {
195 step_id: self.step_id,
196 stream: StdStream::Stdout,
197 line: line.to_owned(),
198 ts: chrono::Utc::now(),
199 });
200 }
201
202 fn on_stderr(&self, line: &str) {
203 self.bus.emit(BuildEvent::StepLog {
204 step_id: self.step_id,
205 stream: StdStream::Stderr,
206 line: line.to_owned(),
207 ts: chrono::Utc::now(),
208 });
209 }
210}