Skip to main content

hm_exec/local/
backend.rs

1//! [`LocalBackend`]: runs the build in-process via the DAG scheduler.
2//!
3//! Each step is executed inside a lightweight VM by the [`VmRunner`], which
4//! drives the [`hm_vm`] subsystem. The VM backend (Docker, etc.) is injected;
5//! snapshot caching is owned by `hm-vm`'s [`hm_vm::ImageRegistry`].
6
7use std::num::NonZeroU64;
8use std::num::NonZeroUsize;
9use std::sync::Arc;
10
11use tokio::sync::mpsc;
12use tokio_util::sync::CancellationToken;
13
14use hm_vm::{HmVm, ImageRegistry, VmBackend, VmConfig};
15
16use crate::local::{RunnerRegistry, VmRunner};
17use crate::{BackendError, BackendHandle, Capabilities, ExecutionBackend, Result, RunRequest};
18
19/// Number of cached snapshots the image registry retains before evicting
20/// least-recently-used entries.
21const REGISTRY_CAPACITY: NonZeroU64 = NonZeroU64::new(64).expect("64 is non-zero");
22
23/// Runs the build locally via the in-process DAG scheduler, executing each
24/// step inside a VM provided by the injected [`hm_vm::VmBackend`].
25///
26/// Constructed once and reused across multiple `start` calls.
27/// `parallelism` controls the maximum number of concurrently running step
28/// chains; the scheduler serialises within each chain regardless.
29#[derive(Debug)]
30pub struct LocalBackend {
31    parallelism: NonZeroUsize,
32    vm_backend: Arc<dyn VmBackend>,
33}
34
35impl LocalBackend {
36    /// Build a backend that executes steps on the given [`hm_vm::VmBackend`].
37    ///
38    /// `parallelism` = max concurrent step chains. The [`NonZeroUsize`] type
39    /// makes the scheduler's semaphore construction deadlock-free by
40    /// construction (a zero-permit semaphore would stall every step).
41    #[must_use]
42    pub fn new(parallelism: NonZeroUsize, vm_backend: Arc<dyn VmBackend>) -> Self {
43        Self {
44            parallelism,
45            vm_backend,
46        }
47    }
48
49    /// Build the runner registry, constructing the [`HmVm`] orchestrator
50    /// (VM backend + snapshot registry) and registering the [`VmRunner`] as
51    /// the default runner.
52    fn build_registry(&self) -> Result<RunnerRegistry> {
53        let cache_dir = hm_util::dirs::hm_cache_dir().ok_or_else(|| {
54            BackendError::Local("cannot resolve the Harmont cache directory".into())
55        })?;
56        let registry = ImageRegistry::open(&cache_dir.join("registry.db"), REGISTRY_CAPACITY)
57            .map_err(|e| BackendError::Local(format!("opening snapshot registry: {e:#}")))?;
58
59        let config = VmConfig {
60            memory_mib: Some(8192),
61            disk_size_gb: Some(10),
62            ..Default::default()
63        };
64
65        let hmvm = HmVm::new(Arc::clone(&self.vm_backend), registry, config);
66
67        let mut runners = RunnerRegistry::new();
68        runners.register(Arc::new(VmRunner::new(Arc::new(hmvm))), true);
69        Ok(runners)
70    }
71}
72
73#[async_trait::async_trait]
74impl ExecutionBackend for LocalBackend {
75    fn name(&self) -> &'static str {
76        "local"
77    }
78
79    fn capabilities(&self) -> Capabilities {
80        Capabilities::local()
81    }
82
83    async fn start(&self, req: RunRequest) -> Result<BackendHandle> {
84        let registry = Arc::new(self.build_registry()?);
85        let (tx, rx) = mpsc::channel(1024);
86        let cancel = CancellationToken::new();
87        let parallelism = self.parallelism;
88        let keep_going = req.options.keep_going;
89        let token = cancel.clone();
90        let join = tokio::spawn(async move {
91            crate::local::run(
92                req.plan.graph,
93                req.repo_root,
94                req.pipeline_slug,
95                parallelism,
96                registry,
97                tx,
98                token,
99                keep_going,
100            )
101            .await
102        });
103        Ok(BackendHandle::spawn(rx, cancel, join))
104    }
105}