1use std::sync::Arc;
4
5use anyhow::Result;
6use tracing::{instrument, warn};
7
8use crate::backend::VmBackend;
9use crate::registry::ImageRegistry;
10use crate::types::{
11 Action, CachingPolicy, ExecutionResult, ImageSource, OutputSink, SnapshotId, SnapshotLabel,
12 VmConfig,
13};
14
15#[derive(Debug)]
21pub struct HmVm {
22 backend: Arc<dyn VmBackend>,
23 registry: ImageRegistry,
24 config: VmConfig,
25}
26
27impl HmVm {
28 pub fn new(backend: Arc<dyn VmBackend>, registry: ImageRegistry, config: VmConfig) -> Self {
30 Self {
31 backend,
32 registry,
33 config,
34 }
35 }
36
37 #[instrument(skip(self, action, sink), fields(cmd = %action.cmd))]
51 pub async fn execute(
52 &self,
53 action: Action,
54 policy: CachingPolicy,
55 sink: &dyn OutputSink,
56 ) -> Result<ExecutionResult> {
57 if let CachingPolicy::Cache { ref key } = policy
59 && let Some(snap) = self.registry.get(key)
60 {
61 if self.backend.snapshot_exists(&snap).await? {
62 return Ok(ExecutionResult {
63 exit_code: 0,
64 snapshot: Some(snap),
65 cached: true,
66 });
67 }
68 let _ = self.registry.invalidate(key);
69 }
70
71 let mut vm = match &action.source {
73 ImageSource::Image(image) => self.backend.create(image, &self.config).await?,
74 ImageSource::Snapshot(snap) => self.backend.restore(snap, &self.config).await?,
75 };
76
77 let result = self.run_in_vm(&mut *vm, &action, &policy, sink).await;
78
79 vm.destroy().await.ok();
81
82 result
83 }
84
85 pub async fn remove_snapshot(&self, snapshot: &SnapshotId) -> Result<()> {
96 self.backend.remove_snapshot(snapshot).await
97 }
98
99 async fn run_in_vm(
102 &self,
103 vm: &mut dyn crate::backend::Vm,
104 action: &Action,
105 policy: &CachingPolicy,
106 sink: &dyn OutputSink,
107 ) -> Result<ExecutionResult> {
108 if let Some(ref host_path) = action.inject {
110 vm.inject(host_path, &action.working_dir).await?;
111 }
112
113 let exec_fut = vm.exec(&action.cmd, &action.env, &action.working_dir, sink);
115 let exit_code = if let Some(timeout) = action.timeout {
116 match tokio::time::timeout(timeout, exec_fut).await {
117 Ok(result) => result?,
118 Err(_) => anyhow::bail!("command timed out after {timeout:?}"),
119 }
120 } else {
121 exec_fut.await?
122 };
123
124 let snapshot = if exit_code == 0 {
126 let label = match policy {
127 CachingPolicy::Cache { key } => SnapshotLabel::Cached(key.clone()),
128 CachingPolicy::None => SnapshotLabel::Ephemeral,
129 };
130 let snap = vm.snapshot(&label).await?;
131
132 if let CachingPolicy::Cache { key } = policy {
133 let evicted = self.registry.put(key, &snap);
134 for old in &evicted {
135 if let Err(e) = self.backend.remove_snapshot(old).await {
136 warn!(snapshot = %old, error = %e, "failed to remove evicted snapshot");
137 }
138 }
139 }
140
141 Some(snap)
142 } else {
143 None
144 };
145
146 Ok(ExecutionResult {
147 exit_code,
148 snapshot,
149 cached: false,
150 })
151 }
152}
153
154#[cfg(test)]
155#[allow(clippy::unwrap_used, clippy::expect_used)]
156mod tests {
157 use super::*;
158 use crate::backend::Vm;
159 use crate::types::{NullSink, SnapshotId};
160
161 use std::path::Path;
162 use std::sync::Mutex;
163
164 use async_trait::async_trait;
165
166 #[derive(Debug, Clone)]
171 struct MockBackend {
172 calls: Arc<Mutex<Vec<String>>>,
173 exit_code: i32,
175 snapshot_exists: bool,
177 }
178
179 impl MockBackend {
180 fn new(exit_code: i32, snapshot_exists: bool) -> Self {
181 Self {
182 calls: Arc::new(Mutex::new(Vec::new())),
183 exit_code,
184 snapshot_exists,
185 }
186 }
187 }
188
189 #[async_trait]
190 impl VmBackend for MockBackend {
191 async fn create(&self, image: &str, _config: &VmConfig) -> Result<Box<dyn Vm>> {
192 self.calls
193 .lock()
194 .map_or_else(|_| {}, |mut c| c.push(format!("create:{image}")));
195 Ok(Box::new(MockVm {
196 calls: Arc::clone(&self.calls),
197 exit_code: self.exit_code,
198 }))
199 }
200
201 async fn restore(&self, snapshot: &SnapshotId, _config: &VmConfig) -> Result<Box<dyn Vm>> {
202 self.calls
203 .lock()
204 .map_or_else(|_| {}, |mut c| c.push(format!("restore:{snapshot}")));
205 Ok(Box::new(MockVm {
206 calls: Arc::clone(&self.calls),
207 exit_code: self.exit_code,
208 }))
209 }
210
211 async fn snapshot_exists(&self, snapshot: &SnapshotId) -> Result<bool> {
212 self.calls.lock().map_or_else(
213 |_| {},
214 |mut c| c.push(format!("snapshot_exists:{snapshot}")),
215 );
216 Ok(self.snapshot_exists)
217 }
218
219 async fn remove_snapshot(&self, snapshot: &SnapshotId) -> Result<()> {
220 self.calls.lock().map_or_else(
221 |_| {},
222 |mut c| c.push(format!("remove_snapshot:{snapshot}")),
223 );
224 Ok(())
225 }
226 }
227
228 struct MockVm {
229 calls: Arc<Mutex<Vec<String>>>,
230 exit_code: i32,
231 }
232
233 #[async_trait]
234 impl Vm for MockVm {
235 async fn inject(&self, host_path: &Path, guest_path: &str) -> Result<()> {
236 self.calls.lock().map_or_else(
237 |_| {},
238 |mut c| c.push(format!("inject:{}:{guest_path}", host_path.display())),
239 );
240 Ok(())
241 }
242
243 async fn exec(
244 &self,
245 cmd: &str,
246 _env: &[(String, String)],
247 _working_dir: &str,
248 _sink: &dyn OutputSink,
249 ) -> Result<i32> {
250 self.calls
251 .lock()
252 .map_or_else(|_| {}, |mut c| c.push(format!("exec:{cmd}")));
253 Ok(self.exit_code)
254 }
255
256 async fn snapshot(&mut self, label: &SnapshotLabel) -> Result<SnapshotId> {
257 let label = match label {
258 SnapshotLabel::Ephemeral => "ephemeral".to_string(),
259 SnapshotLabel::Cached(key) => key.clone(),
260 };
261 self.calls
262 .lock()
263 .map_or_else(|_| {}, |mut c| c.push(format!("snapshot:{label}")));
264 Ok(SnapshotId::new(format!("snap-{label}")))
265 }
266
267 async fn destroy(&mut self) -> Result<()> {
268 self.calls
269 .lock()
270 .map_or_else(|_| {}, |mut c| c.push("destroy".into()));
271 Ok(())
272 }
273 }
274
275 fn open_temp_registry(capacity: u64) -> (ImageRegistry, tempfile::TempDir) {
280 let dir = tempfile::tempdir().expect("failed to create temp dir");
281 let db = dir.path().join("registry.db");
282 let capacity = std::num::NonZeroU64::new(capacity).expect("capacity must be non-zero");
283 let reg = ImageRegistry::open(&db, capacity).expect("failed to open registry");
284 (reg, dir)
285 }
286
287 fn make_action() -> Action {
288 Action {
289 source: ImageSource::Image("alpine:latest".into()),
290 cmd: "echo hello".into(),
291 env: vec![],
292 working_dir: "/work".into(),
293 timeout: None,
294 inject: Some(std::path::PathBuf::from("/host/src")),
295 }
296 }
297
298 fn calls(backend: &MockBackend) -> Vec<String> {
299 backend.calls.lock().map_or_else(|_| vec![], |c| c.clone())
300 }
301
302 #[tokio::test]
307 async fn cache_miss_creates_executes_and_snapshots() {
308 let backend = MockBackend::new(0, false);
309 let (registry, _dir) = open_temp_registry(10);
310 let hm = HmVm::new(Arc::new(backend.clone()), registry, VmConfig::default());
311
312 let result = hm
313 .execute(
314 make_action(),
315 CachingPolicy::Cache {
316 key: "step-1".into(),
317 },
318 &NullSink,
319 )
320 .await
321 .expect("execute should succeed");
322
323 assert_eq!(result.exit_code, 0);
324 assert!(!result.cached);
325 assert!(result.snapshot.is_some());
326
327 let log = calls(&backend);
328 assert!(log.iter().any(|c| c.starts_with("create:")));
329 assert!(log.iter().any(|c| c.starts_with("inject:")));
330 assert!(log.iter().any(|c| c.starts_with("exec:")));
331 assert!(log.iter().any(|c| c.starts_with("snapshot:")));
332 assert!(log.iter().any(|c| c == "destroy"));
333 }
334
335 #[tokio::test]
336 async fn cache_hit_skips_execution() {
337 let backend = MockBackend::new(0, true);
338 let (registry, _dir) = open_temp_registry(10);
339
340 registry.put("step-1", &SnapshotId::new("cached-snap"));
342
343 let hm = HmVm::new(Arc::new(backend.clone()), registry, VmConfig::default());
344
345 let result = hm
346 .execute(
347 make_action(),
348 CachingPolicy::Cache {
349 key: "step-1".into(),
350 },
351 &NullSink,
352 )
353 .await
354 .expect("execute should succeed");
355
356 assert_eq!(result.exit_code, 0);
357 assert!(result.cached);
358 assert_eq!(result.snapshot, Some(SnapshotId::new("cached-snap")));
359
360 let log = calls(&backend);
361 assert!(log.iter().any(|c| c.starts_with("snapshot_exists:")));
363 assert!(!log.iter().any(|c| c.starts_with("create:")));
364 assert!(!log.iter().any(|c| c.starts_with("exec:")));
365 }
366
367 #[tokio::test]
368 async fn no_cache_policy_does_not_store() {
369 let backend = MockBackend::new(0, false);
370 let (registry, _dir) = open_temp_registry(10);
371 let hm = HmVm::new(Arc::new(backend.clone()), registry, VmConfig::default());
372
373 let result = hm
374 .execute(make_action(), CachingPolicy::None, &NullSink)
375 .await
376 .expect("execute should succeed");
377
378 assert_eq!(result.exit_code, 0);
379 assert!(!result.cached);
380
381 let log = calls(&backend);
383 assert!(log.iter().any(|c| c.starts_with("exec:")));
384
385 assert!(hm.registry.is_empty());
387 }
388
389 #[tokio::test]
390 async fn nonzero_exit_does_not_cache() {
391 let backend = MockBackend::new(1, false);
392 let (registry, _dir) = open_temp_registry(10);
393 let hm = HmVm::new(Arc::new(backend.clone()), registry, VmConfig::default());
394
395 let result = hm
396 .execute(
397 make_action(),
398 CachingPolicy::Cache {
399 key: "step-fail".into(),
400 },
401 &NullSink,
402 )
403 .await
404 .expect("execute should succeed");
405
406 assert_eq!(result.exit_code, 1);
407 assert!(!result.cached);
408 assert!(result.snapshot.is_none());
409
410 let log = calls(&backend);
411 assert!(log.iter().any(|c| c.starts_with("exec:")));
413 assert!(!log.iter().any(|c| c.starts_with("snapshot:")));
414
415 assert!(hm.registry.is_empty());
417 }
418}