1use std::num::NonZeroU64;
8use std::num::NonZeroUsize;
9use std::sync::Arc;
10
11use tokio::sync::mpsc;
12use tokio_util::sync::CancellationToken;
13
14use hm_vm::{HmVm, ImageRegistry, VmBackend, VmConfig};
15
16use crate::local::{RunnerRegistry, VmRunner};
17use crate::{BackendError, BackendHandle, Capabilities, ExecutionBackend, Result, RunRequest};
18
19const REGISTRY_CAPACITY: NonZeroU64 = NonZeroU64::new(64).expect("64 is non-zero");
22
23#[derive(Debug)]
30pub struct LocalBackend {
31 parallelism: NonZeroUsize,
32 vm_backend: Arc<dyn VmBackend>,
33}
34
35impl LocalBackend {
36 #[must_use]
42 pub fn new(parallelism: NonZeroUsize, vm_backend: Arc<dyn VmBackend>) -> Self {
43 Self {
44 parallelism,
45 vm_backend,
46 }
47 }
48
49 fn build_registry(&self) -> Result<RunnerRegistry> {
53 let cache_dir = hm_util::dirs::hm_cache_dir().ok_or_else(|| {
54 BackendError::Local("cannot resolve the Harmont cache directory".into())
55 })?;
56 let registry = ImageRegistry::open(&cache_dir.join("registry.db"), REGISTRY_CAPACITY)
57 .map_err(|e| BackendError::Local(format!("opening snapshot registry: {e:#}")))?;
58
59 let config = VmConfig {
60 memory_mib: Some(8192),
61 disk_size_gb: Some(10),
62 ..Default::default()
63 };
64
65 let hmvm = HmVm::new(Arc::clone(&self.vm_backend), registry, config);
66
67 let mut runners = RunnerRegistry::new();
68 runners.register(Arc::new(VmRunner::new(Arc::new(hmvm))), true);
69 Ok(runners)
70 }
71}
72
73#[async_trait::async_trait]
74impl ExecutionBackend for LocalBackend {
75 fn name(&self) -> &'static str {
76 "local"
77 }
78
79 fn capabilities(&self) -> Capabilities {
80 Capabilities::local()
81 }
82
83 async fn start(&self, req: RunRequest) -> Result<BackendHandle> {
84 let registry = Arc::new(self.build_registry()?);
85 let (tx, rx) = mpsc::channel(1024);
86 let cancel = CancellationToken::new();
87 let parallelism = self.parallelism;
88 let keep_going = req.options.keep_going;
89 let token = cancel.clone();
90 let join = tokio::spawn(async move {
91 crate::local::run(
92 req.plan.graph,
93 req.repo_root,
94 req.pipeline_slug,
95 parallelism,
96 registry,
97 tx,
98 token,
99 keep_going,
100 )
101 .await
102 });
103 Ok(BackendHandle::spawn(rx, cancel, join))
104 }
105}