Skip to main content

hm_vm/
vm.rs

1//! High-level VM orchestrator.
2
3use std::sync::Arc;
4
5use anyhow::Result;
6use tracing::{instrument, warn};
7
8use crate::backend::VmBackend;
9use crate::registry::ImageRegistry;
10use crate::types::{
11    Action, CachingPolicy, ExecutionResult, ImageSource, OutputSink, SnapshotId, SnapshotLabel,
12    VmConfig,
13};
14
15/// High-level orchestrator that drives the VM lifecycle.
16///
17/// `HmVm` composes a [`VmBackend`] with an [`ImageRegistry`] to provide
18/// cache-aware execution: if a cached snapshot already exists for a given
19/// caching key the expensive create-inject-exec cycle is skipped entirely.
20#[derive(Debug)]
21pub struct HmVm {
22    backend: Arc<dyn VmBackend>,
23    registry: ImageRegistry,
24    config: VmConfig,
25}
26
27impl HmVm {
28    /// Create a new orchestrator from the given backend, registry and config.
29    pub fn new(backend: Arc<dyn VmBackend>, registry: ImageRegistry, config: VmConfig) -> Self {
30        Self {
31            backend,
32            registry,
33            config,
34        }
35    }
36
37    /// Execute an [`Action`] inside a VM, obeying the given [`CachingPolicy`].
38    ///
39    /// # Cache behaviour
40    ///
41    /// When the policy is [`CachingPolicy::Cache`] the registry is consulted
42    /// first. A cache hit that still exists in the backend returns immediately.
43    /// On a successful (exit-code 0) execution the resulting snapshot is stored
44    /// in the registry; evicted entries are cleaned up in the backend.
45    ///
46    /// # Errors
47    ///
48    /// Returns an error if the backend fails to create, restore, inject, or
49    /// execute. Best-effort cleanup is performed even on failure paths.
50    #[instrument(skip(self, action, sink), fields(cmd = %action.cmd))]
51    pub async fn execute(
52        &self,
53        action: Action,
54        policy: CachingPolicy,
55        sink: &dyn OutputSink,
56    ) -> Result<ExecutionResult> {
57        // 1. Cache check
58        if let CachingPolicy::Cache { ref key } = policy
59            && let Some(snap) = self.registry.get(key)
60        {
61            if self.backend.snapshot_exists(&snap).await? {
62                return Ok(ExecutionResult {
63                    exit_code: 0,
64                    snapshot: Some(snap),
65                    cached: true,
66                });
67            }
68            let _ = self.registry.invalidate(key);
69        }
70
71        // 2. Create or restore VM
72        let mut vm = match &action.source {
73            ImageSource::Image(image) => self.backend.create(image, &self.config).await?,
74            ImageSource::Snapshot(snap) => self.backend.restore(snap, &self.config).await?,
75        };
76
77        let result = self.run_in_vm(&mut *vm, &action, &policy, sink).await;
78
79        // Always destroy the VM, even on error.
80        vm.destroy().await.ok();
81
82        result
83    }
84
85    /// Remove a snapshot from the backend store.
86    ///
87    /// Used to reap ephemeral (uncached) leaf snapshots once a run finishes —
88    /// `CachingPolicy::None` commits a transient `ephemeral:*` image purely for
89    /// downstream container lineage, and nothing in the registry ever evicts
90    /// it. The scheduler reaps these explicitly at run end.
91    ///
92    /// # Errors
93    ///
94    /// Returns an error if the backend fails to remove the snapshot.
95    pub async fn remove_snapshot(&self, snapshot: &SnapshotId) -> Result<()> {
96        self.backend.remove_snapshot(snapshot).await
97    }
98
99    /// Inner lifecycle: inject, exec, snapshot. Separated so the caller
100    /// can guarantee `vm.destroy()` runs regardless of outcome.
101    async fn run_in_vm(
102        &self,
103        vm: &mut dyn crate::backend::Vm,
104        action: &Action,
105        policy: &CachingPolicy,
106        sink: &dyn OutputSink,
107    ) -> Result<ExecutionResult> {
108        // 3. Inject workspace
109        if let Some(ref host_path) = action.inject {
110            vm.inject(host_path, &action.working_dir).await?;
111        }
112
113        // 4. Execute command (with optional timeout)
114        let exec_fut = vm.exec(&action.cmd, &action.env, &action.working_dir, sink);
115        let exit_code = if let Some(timeout) = action.timeout {
116            match tokio::time::timeout(timeout, exec_fut).await {
117                Ok(result) => result?,
118                Err(_) => anyhow::bail!("command timed out after {timeout:?}"),
119            }
120        } else {
121            exec_fut.await?
122        };
123
124        // 5. Snapshot and cache on success
125        let snapshot = if exit_code == 0 {
126            let label = match policy {
127                CachingPolicy::Cache { key } => SnapshotLabel::Cached(key.clone()),
128                CachingPolicy::None => SnapshotLabel::Ephemeral,
129            };
130            let snap = vm.snapshot(&label).await?;
131
132            if let CachingPolicy::Cache { key } = policy {
133                let evicted = self.registry.put(key, &snap);
134                for old in &evicted {
135                    if let Err(e) = self.backend.remove_snapshot(old).await {
136                        warn!(snapshot = %old, error = %e, "failed to remove evicted snapshot");
137                    }
138                }
139            }
140
141            Some(snap)
142        } else {
143            None
144        };
145
146        Ok(ExecutionResult {
147            exit_code,
148            snapshot,
149            cached: false,
150        })
151    }
152}
153
154#[cfg(test)]
155#[allow(clippy::unwrap_used, clippy::expect_used)]
156mod tests {
157    use super::*;
158    use crate::backend::Vm;
159    use crate::types::{NullSink, SnapshotId};
160
161    use std::path::Path;
162    use std::sync::Mutex;
163
164    use async_trait::async_trait;
165
166    // ------------------------------------------------------------------ //
167    // Mock backend + VM                                                    //
168    // ------------------------------------------------------------------ //
169
170    #[derive(Debug, Clone)]
171    struct MockBackend {
172        calls: Arc<Mutex<Vec<String>>>,
173        /// Exit code that `MockVm::exec` will return.
174        exit_code: i32,
175        /// Whether `snapshot_exists` should return true.
176        snapshot_exists: bool,
177    }
178
179    impl MockBackend {
180        fn new(exit_code: i32, snapshot_exists: bool) -> Self {
181            Self {
182                calls: Arc::new(Mutex::new(Vec::new())),
183                exit_code,
184                snapshot_exists,
185            }
186        }
187    }
188
189    #[async_trait]
190    impl VmBackend for MockBackend {
191        async fn create(&self, image: &str, _config: &VmConfig) -> Result<Box<dyn Vm>> {
192            self.calls
193                .lock()
194                .map_or_else(|_| {}, |mut c| c.push(format!("create:{image}")));
195            Ok(Box::new(MockVm {
196                calls: Arc::clone(&self.calls),
197                exit_code: self.exit_code,
198            }))
199        }
200
201        async fn restore(&self, snapshot: &SnapshotId, _config: &VmConfig) -> Result<Box<dyn Vm>> {
202            self.calls
203                .lock()
204                .map_or_else(|_| {}, |mut c| c.push(format!("restore:{snapshot}")));
205            Ok(Box::new(MockVm {
206                calls: Arc::clone(&self.calls),
207                exit_code: self.exit_code,
208            }))
209        }
210
211        async fn snapshot_exists(&self, snapshot: &SnapshotId) -> Result<bool> {
212            self.calls.lock().map_or_else(
213                |_| {},
214                |mut c| c.push(format!("snapshot_exists:{snapshot}")),
215            );
216            Ok(self.snapshot_exists)
217        }
218
219        async fn remove_snapshot(&self, snapshot: &SnapshotId) -> Result<()> {
220            self.calls.lock().map_or_else(
221                |_| {},
222                |mut c| c.push(format!("remove_snapshot:{snapshot}")),
223            );
224            Ok(())
225        }
226    }
227
228    struct MockVm {
229        calls: Arc<Mutex<Vec<String>>>,
230        exit_code: i32,
231    }
232
233    #[async_trait]
234    impl Vm for MockVm {
235        async fn inject(&self, host_path: &Path, guest_path: &str) -> Result<()> {
236            self.calls.lock().map_or_else(
237                |_| {},
238                |mut c| c.push(format!("inject:{}:{guest_path}", host_path.display())),
239            );
240            Ok(())
241        }
242
243        async fn exec(
244            &self,
245            cmd: &str,
246            _env: &[(String, String)],
247            _working_dir: &str,
248            _sink: &dyn OutputSink,
249        ) -> Result<i32> {
250            self.calls
251                .lock()
252                .map_or_else(|_| {}, |mut c| c.push(format!("exec:{cmd}")));
253            Ok(self.exit_code)
254        }
255
256        async fn snapshot(&mut self, label: &SnapshotLabel) -> Result<SnapshotId> {
257            let label = match label {
258                SnapshotLabel::Ephemeral => "ephemeral".to_string(),
259                SnapshotLabel::Cached(key) => key.clone(),
260            };
261            self.calls
262                .lock()
263                .map_or_else(|_| {}, |mut c| c.push(format!("snapshot:{label}")));
264            Ok(SnapshotId::new(format!("snap-{label}")))
265        }
266
267        async fn destroy(&mut self) -> Result<()> {
268            self.calls
269                .lock()
270                .map_or_else(|_| {}, |mut c| c.push("destroy".into()));
271            Ok(())
272        }
273    }
274
275    // ------------------------------------------------------------------ //
276    // Helpers                                                              //
277    // ------------------------------------------------------------------ //
278
279    fn open_temp_registry(capacity: u64) -> (ImageRegistry, tempfile::TempDir) {
280        let dir = tempfile::tempdir().expect("failed to create temp dir");
281        let db = dir.path().join("registry.db");
282        let capacity = std::num::NonZeroU64::new(capacity).expect("capacity must be non-zero");
283        let reg = ImageRegistry::open(&db, capacity).expect("failed to open registry");
284        (reg, dir)
285    }
286
287    fn make_action() -> Action {
288        Action {
289            source: ImageSource::Image("alpine:latest".into()),
290            cmd: "echo hello".into(),
291            env: vec![],
292            working_dir: "/work".into(),
293            timeout: None,
294            inject: Some(std::path::PathBuf::from("/host/src")),
295        }
296    }
297
298    fn calls(backend: &MockBackend) -> Vec<String> {
299        backend.calls.lock().map_or_else(|_| vec![], |c| c.clone())
300    }
301
302    // ------------------------------------------------------------------ //
303    // Tests                                                                //
304    // ------------------------------------------------------------------ //
305
306    #[tokio::test]
307    async fn cache_miss_creates_executes_and_snapshots() {
308        let backend = MockBackend::new(0, false);
309        let (registry, _dir) = open_temp_registry(10);
310        let hm = HmVm::new(Arc::new(backend.clone()), registry, VmConfig::default());
311
312        let result = hm
313            .execute(
314                make_action(),
315                CachingPolicy::Cache {
316                    key: "step-1".into(),
317                },
318                &NullSink,
319            )
320            .await
321            .expect("execute should succeed");
322
323        assert_eq!(result.exit_code, 0);
324        assert!(!result.cached);
325        assert!(result.snapshot.is_some());
326
327        let log = calls(&backend);
328        assert!(log.iter().any(|c| c.starts_with("create:")));
329        assert!(log.iter().any(|c| c.starts_with("inject:")));
330        assert!(log.iter().any(|c| c.starts_with("exec:")));
331        assert!(log.iter().any(|c| c.starts_with("snapshot:")));
332        assert!(log.iter().any(|c| c == "destroy"));
333    }
334
335    #[tokio::test]
336    async fn cache_hit_skips_execution() {
337        let backend = MockBackend::new(0, true);
338        let (registry, _dir) = open_temp_registry(10);
339
340        // Pre-populate the registry.
341        registry.put("step-1", &SnapshotId::new("cached-snap"));
342
343        let hm = HmVm::new(Arc::new(backend.clone()), registry, VmConfig::default());
344
345        let result = hm
346            .execute(
347                make_action(),
348                CachingPolicy::Cache {
349                    key: "step-1".into(),
350                },
351                &NullSink,
352            )
353            .await
354            .expect("execute should succeed");
355
356        assert_eq!(result.exit_code, 0);
357        assert!(result.cached);
358        assert_eq!(result.snapshot, Some(SnapshotId::new("cached-snap")));
359
360        let log = calls(&backend);
361        // Only snapshot_exists should have been called -- no create, exec, etc.
362        assert!(log.iter().any(|c| c.starts_with("snapshot_exists:")));
363        assert!(!log.iter().any(|c| c.starts_with("create:")));
364        assert!(!log.iter().any(|c| c.starts_with("exec:")));
365    }
366
367    #[tokio::test]
368    async fn no_cache_policy_does_not_store() {
369        let backend = MockBackend::new(0, false);
370        let (registry, _dir) = open_temp_registry(10);
371        let hm = HmVm::new(Arc::new(backend.clone()), registry, VmConfig::default());
372
373        let result = hm
374            .execute(make_action(), CachingPolicy::None, &NullSink)
375            .await
376            .expect("execute should succeed");
377
378        assert_eq!(result.exit_code, 0);
379        assert!(!result.cached);
380
381        // Exec should have run.
382        let log = calls(&backend);
383        assert!(log.iter().any(|c| c.starts_with("exec:")));
384
385        // Registry should be empty -- no caching performed.
386        assert!(hm.registry.is_empty());
387    }
388
389    #[tokio::test]
390    async fn nonzero_exit_does_not_cache() {
391        let backend = MockBackend::new(1, false);
392        let (registry, _dir) = open_temp_registry(10);
393        let hm = HmVm::new(Arc::new(backend.clone()), registry, VmConfig::default());
394
395        let result = hm
396            .execute(
397                make_action(),
398                CachingPolicy::Cache {
399                    key: "step-fail".into(),
400                },
401                &NullSink,
402            )
403            .await
404            .expect("execute should succeed");
405
406        assert_eq!(result.exit_code, 1);
407        assert!(!result.cached);
408        assert!(result.snapshot.is_none());
409
410        let log = calls(&backend);
411        // Exec should have run but no snapshot taken.
412        assert!(log.iter().any(|c| c.starts_with("exec:")));
413        assert!(!log.iter().any(|c| c.starts_with("snapshot:")));
414
415        // Registry should still be empty.
416        assert!(hm.registry.is_empty());
417    }
418}