use std::num::NonZeroU64;
use std::num::NonZeroUsize;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use hm_vm::{HmVm, ImageRegistry, VmBackend, VmConfig};
use crate::local::{RunnerRegistry, VmRunner};
use crate::{BackendError, BackendHandle, Capabilities, ExecutionBackend, Result, RunRequest};
const REGISTRY_CAPACITY: NonZeroU64 = NonZeroU64::new(64).expect("64 is non-zero");
#[derive(Debug)]
pub struct LocalBackend {
parallelism: NonZeroUsize,
vm_backend: Arc<dyn VmBackend>,
}
impl LocalBackend {
#[must_use]
pub fn new(parallelism: NonZeroUsize, vm_backend: Arc<dyn VmBackend>) -> Self {
Self {
parallelism,
vm_backend,
}
}
fn build_registry(&self) -> Result<RunnerRegistry> {
let cache_dir = hm_util::dirs::hm_cache_dir().ok_or_else(|| {
BackendError::Local("cannot resolve the Harmont cache directory".into())
})?;
let registry = ImageRegistry::open(&cache_dir.join("registry.db"), REGISTRY_CAPACITY)
.map_err(|e| BackendError::Local(format!("opening snapshot registry: {e:#}")))?;
let config = VmConfig {
memory_mib: Some(8192),
disk_size_gb: Some(10),
..Default::default()
};
let hmvm = HmVm::new(Arc::clone(&self.vm_backend), registry, config);
let mut runners = RunnerRegistry::new();
runners.register(Arc::new(VmRunner::new(Arc::new(hmvm))), true);
Ok(runners)
}
}
#[async_trait::async_trait]
impl ExecutionBackend for LocalBackend {
fn name(&self) -> &'static str {
"local"
}
fn capabilities(&self) -> Capabilities {
Capabilities::local()
}
async fn start(&self, req: RunRequest) -> Result<BackendHandle> {
let registry = Arc::new(self.build_registry()?);
let (tx, rx) = mpsc::channel(1024);
let cancel = CancellationToken::new();
let parallelism = self.parallelism;
let keep_going = req.options.keep_going;
let token = cancel.clone();
let join = tokio::spawn(async move {
crate::local::run(
req.plan.graph,
req.repo_root,
req.pipeline_slug,
parallelism,
registry,
tx,
token,
keep_going,
)
.await
});
Ok(BackendHandle::spawn(rx, cancel, join))
}
}