use std::collections::HashMap;
use std::fmt;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use anyhow::{anyhow, Context, Result};
use chrono::Utc;
use dashmap::DashMap;
use parking_lot::Mutex;
use smith_config::executor::VmPoolConfig;
use tar::Builder;
use tokio::process::Command;
use tokio::sync::{Mutex as AsyncMutex, OwnedMutexGuard};
use tokio::task::JoinHandle;
use tokio::time::sleep;
use tracing::{debug, error, info, warn};
use uuid::Uuid;
use zstd::Encoder;
#[derive(Clone, Debug)]
pub struct VmPoolRuntimeConfig {
pub enabled: bool,
pub volume_root: PathBuf,
pub nix_profile: Option<String>,
pub shell: PathBuf,
pub shell_args: Vec<String>,
pub env: HashMap<String, String>,
pub max_vms: usize,
pub idle_shutdown: Duration,
pub prune_after: Duration,
pub backup_after: Option<Duration>,
pub backup_destination: Option<PathBuf>,
pub bootstrap_command: Option<Vec<String>>,
}
impl From<&VmPoolConfig> for VmPoolRuntimeConfig {
fn from(config: &VmPoolConfig) -> Self {
Self {
enabled: config.enabled,
volume_root: config.volume_root.clone(),
nix_profile: config.nix_profile.clone(),
shell: config.shell.clone(),
shell_args: config.shell_args.clone(),
env: config.env.clone(),
max_vms: config.max_vms,
idle_shutdown: Duration::from_secs(config.idle_shutdown_seconds),
prune_after: Duration::from_secs(config.prune_after_seconds),
backup_after: config.backup_after_seconds.map(Duration::from_secs),
backup_destination: config.backup_destination.clone(),
bootstrap_command: config.bootstrap_command.clone(),
}
}
}
pub struct MicroVmManager {
config: Arc<VmPoolRuntimeConfig>,
vms: Arc<DashMap<Uuid, Arc<MicroVm>>>,
cleanup_handle: Option<JoinHandle<()>>,
}
impl fmt::Debug for MicroVmManager {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("MicroVmManager")
.field("enabled", &self.config.enabled)
.field("volume_root", &self.config.volume_root)
.field("active_vms", &self.vms.len())
.finish()
}
}
impl MicroVmManager {
pub fn new(config: VmPoolRuntimeConfig) -> Result<Arc<Self>> {
let config = Arc::new(config);
let vms = Arc::new(DashMap::new());
let cleanup_handle = if config.enabled {
Some(Self::spawn_cleanup_task(vms.clone()))
} else {
None
};
Ok(Arc::new(Self {
config,
vms,
cleanup_handle,
}))
}
pub fn is_enabled(&self) -> bool {
self.config.enabled
}
pub async fn acquire(&self, session_id: Uuid) -> Result<VmExecutionGuard> {
if !self.config.enabled {
return Err(anyhow!(
"VM pool is disabled; no persistent environments available"
));
}
let vm = self.get_or_create_vm(session_id).await?;
let lock = vm.command_lock.clone().lock_owned().await;
vm.begin_execution();
Ok(VmExecutionGuard { vm, lock })
}
async fn get_or_create_vm(&self, session_id: Uuid) -> Result<Arc<MicroVm>> {
if let Some(existing) = self.vms.get(&session_id) {
return Ok(existing.clone());
}
if self.vms.len() >= self.config.max_vms {
if !self.try_evict_stopped_vm().await? {
return Err(anyhow!(
"VM pool is at capacity ({} sessions)",
self.config.max_vms
));
}
}
let vm = Arc::new(MicroVm::new(session_id, self.config.clone()).await?);
use dashmap::mapref::entry::Entry;
match self.vms.entry(session_id) {
Entry::Occupied(entry) => Ok(entry.get().clone()),
Entry::Vacant(entry) => {
entry.insert(vm.clone());
Ok(vm)
}
}
}
async fn try_evict_stopped_vm(&self) -> Result<bool> {
let mut candidates = Vec::new();
for entry in self.vms.iter() {
candidates.push((*entry.key(), entry.value().clone()));
}
let mut selected: Option<(Uuid, Instant)> = None;
for (session_id, vm) in candidates {
let snapshot = vm.snapshot();
if snapshot.status == VmStatus::Stopped {
let stopped_at = snapshot.stopped_at.unwrap_or_else(|| snapshot.last_used);
if selected
.as_ref()
.map(|(_, ts)| stopped_at < *ts)
.unwrap_or(true)
{
selected = Some((session_id, stopped_at));
}
}
}
if let Some((session_id, _)) = selected {
if let Some((_, vm)) = self.vms.remove(&session_id) {
vm.force_prune().await;
return Ok(true);
}
}
Ok(false)
}
fn spawn_cleanup_task(vms: Arc<DashMap<Uuid, Arc<MicroVm>>>) -> JoinHandle<()> {
tokio::spawn(async move {
let interval = Duration::from_secs(60);
loop {
sleep(interval).await;
if let Err(err) = Self::maintenance_pass(&vms).await {
warn!(error = %err, "Background VM maintenance failed");
}
}
})
}
async fn maintenance_pass(vms: &DashMap<Uuid, Arc<MicroVm>>) -> Result<()> {
let mut snapshot = Vec::new();
for entry in vms.iter() {
snapshot.push((*entry.key(), entry.value().clone()));
}
for (session_id, vm) in snapshot {
match vm.perform_maintenance().await {
Ok(MaintenanceAction::Keep) => {}
Ok(MaintenanceAction::Remove) => {
vms.remove(&session_id);
}
Err(err) => {
warn!(
session_id = %session_id,
error = %err,
"Failed VM maintenance cycle"
);
}
}
}
Ok(())
}
pub async fn run_maintenance_now(&self) -> Result<()> {
if !self.config.enabled {
return Ok(());
}
Self::maintenance_pass(&self.vms).await
}
}
impl Drop for MicroVmManager {
fn drop(&mut self) {
if let Some(handle) = &self.cleanup_handle {
handle.abort();
}
}
}
pub struct VmExecutionGuard {
vm: Arc<MicroVm>,
lock: OwnedMutexGuard<()>,
}
impl VmExecutionGuard {
pub fn session_id(&self) -> Uuid {
self.vm.session_id
}
pub fn shell_path(&self) -> &Path {
&self.vm.config.shell
}
pub fn shell_args(&self) -> &[String] {
&self.vm.config.shell_args
}
pub fn environment(&self) -> &HashMap<String, String> {
&self.vm.config.env
}
pub fn workdir(&self) -> &Path {
self.vm.volume_path.as_path()
}
}
impl Drop for VmExecutionGuard {
fn drop(&mut self) {
self.vm.finish_execution();
}
}
struct MicroVm {
session_id: Uuid,
root: PathBuf,
volume_path: PathBuf,
lifecycle: Mutex<VmLifecycleState>,
command_lock: Arc<AsyncMutex<()>>,
config: Arc<VmPoolRuntimeConfig>,
backup_completed: AtomicBool,
}
impl MicroVm {
async fn new(session_id: Uuid, config: Arc<VmPoolRuntimeConfig>) -> Result<Self> {
let vm_root = config.volume_root.join(session_id.to_string());
tokio::fs::create_dir_all(&vm_root)
.await
.with_context(|| format!("Failed to create VM root at {}", vm_root.display()))?;
let volume_path = vm_root.join("volume");
tokio::fs::create_dir_all(&volume_path)
.await
.with_context(|| format!("Failed to create VM volume {}", volume_path.display()))?;
let vm = Self {
session_id,
root: vm_root,
volume_path,
lifecycle: Mutex::new(VmLifecycleState::new()),
command_lock: Arc::new(AsyncMutex::new(())),
config,
backup_completed: AtomicBool::new(false),
};
vm.bootstrap().await?;
Ok(vm)
}
fn snapshot(&self) -> VmLifecycleSnapshot {
let state = self.lifecycle.lock();
VmLifecycleSnapshot {
status: state.status,
last_used: state.last_used,
stopped_at: state.stopped_at,
}
}
fn begin_execution(&self) {
let mut state = self.lifecycle.lock();
state.status = VmStatus::Active;
state.last_used = Instant::now();
state.stopped_at = None;
}
fn finish_execution(&self) {
let mut state = self.lifecycle.lock();
state.last_used = Instant::now();
}
async fn perform_maintenance(&self) -> Result<MaintenanceAction> {
let now = Instant::now();
let stopped_at = {
let mut state = self.lifecycle.lock();
if state.status == VmStatus::Active {
if now.duration_since(state.last_used) >= self.config.idle_shutdown {
info!(
session_id = %self.session_id,
idle_seconds = self.config.idle_shutdown.as_secs(),
"VM idle threshold reached; marking stopped"
);
state.status = VmStatus::Stopped;
state.stopped_at = Some(now);
}
return Ok(MaintenanceAction::Keep);
}
state.stopped_at.unwrap_or_else(|| state.last_used)
};
if let Some(backup_after) = self.config.backup_after {
if now.duration_since(stopped_at) >= backup_after
&& self
.backup_completed
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
self.perform_backup().await?;
}
}
if now.duration_since(stopped_at) >= self.config.prune_after {
self.prune().await?;
return Ok(MaintenanceAction::Remove);
}
Ok(MaintenanceAction::Keep)
}
async fn bootstrap(&self) -> Result<()> {
if let Some(command) = &self.config.bootstrap_command {
self.run_custom_command(command)
.await
.context("Bootstrap command failed")?;
}
if let Some(profile) = &self.config.nix_profile {
self.ensure_nix_profile(profile)
.await
.context("Failed to hydrate nix profile")?;
}
Ok(())
}
async fn run_custom_command(&self, command: &[String]) -> Result<()> {
if command.is_empty() {
return Ok(());
}
let (program, args) = command
.split_first()
.ok_or_else(|| anyhow!("Bootstrap command must contain at least one argument"))?;
let mut cmd = Command::new(program);
cmd.args(args);
cmd.current_dir(&self.volume_path);
cmd.kill_on_drop(true);
cmd.stdout(std::process::Stdio::null());
cmd.stderr(std::process::Stdio::null());
let status = cmd
.status()
.await
.with_context(|| format!("Failed to spawn bootstrap command '{}'", program))?;
if !status.success() {
return Err(anyhow!(
"Bootstrap command {:?} exited with {:?}",
command,
status.code()
));
}
Ok(())
}
async fn ensure_nix_profile(&self, profile: &str) -> Result<()> {
let mut cmd = Command::new("nix");
cmd.arg("develop").arg(profile).arg("--command").arg("true");
cmd.current_dir(&self.volume_path);
cmd.kill_on_drop(true);
cmd.stdout(std::process::Stdio::null());
cmd.stderr(std::process::Stdio::null());
let status = cmd
.status()
.await
.with_context(|| format!("Failed to invoke nix develop for {profile}"))?;
if !status.success() {
warn!(
session_id = %self.session_id,
profile,
exit_code = ?status.code(),
"nix develop returned non-zero status"
);
}
Ok(())
}
async fn perform_backup(&self) -> Result<()> {
let dest_root = match &self.config.backup_destination {
Some(path) => path.clone(),
None => return Ok(()),
};
tokio::fs::create_dir_all(&dest_root)
.await
.with_context(|| {
format!(
"Failed to create backup destination directory {}",
dest_root.display()
)
})?;
let timestamp = chrono::Utc::now().format("%Y%m%dT%H%M%SZ");
let backup_path = dest_root.join(format!("{}-{}.tar.zst", self.session_id, timestamp));
let backup_path_for_log = backup_path.clone();
let volume = self.volume_path.clone();
tokio::task::spawn_blocking(move || -> Result<()> {
let file = std::fs::File::create(&backup_path)
.with_context(|| format!("Failed to create backup {}", backup_path.display()))?;
let encoder = Encoder::new(file, 0)?;
let mut tar_builder = Builder::new(encoder.auto_finish());
tar_builder
.append_dir_all(".", &volume)
.with_context(|| format!("Failed to archive volume {}", volume.display()))?;
tar_builder.finish()?;
Ok(())
})
.await
.context("Backup task join failure")??;
info!(
session_id = %self.session_id,
path = %backup_path_for_log.display(),
"VM volume backup completed"
);
Ok(())
}
async fn prune(&self) -> Result<()> {
tokio::fs::remove_dir_all(&self.root)
.await
.with_context(|| format!("Failed to prune VM directory {}", self.root.display()))?;
info!(
session_id = %self.session_id,
root = %self.root.display(),
"Pruned VM volume"
);
Ok(())
}
async fn force_prune(&self) {
if let Err(err) = self.prune().await {
warn!(
session_id = %self.session_id,
error = %err,
"Force prune failed"
);
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum VmStatus {
Active,
Stopped,
}
#[derive(Clone, Copy, Debug)]
struct VmLifecycleState {
status: VmStatus,
last_used: Instant,
stopped_at: Option<Instant>,
}
impl VmLifecycleState {
fn new() -> Self {
Self {
status: VmStatus::Active,
last_used: Instant::now(),
stopped_at: None,
}
}
}
#[derive(Clone, Copy, Debug)]
struct VmLifecycleSnapshot {
status: VmStatus,
last_used: Instant,
stopped_at: Option<Instant>,
}
enum MaintenanceAction {
Keep,
Remove,
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
fn create_test_vm_pool_config(volume_root: PathBuf) -> VmPoolConfig {
VmPoolConfig {
enabled: true,
volume_root,
nix_profile: None,
shell: PathBuf::from("/bin/bash"),
shell_args: vec!["-c".to_string()],
env: HashMap::new(),
max_vms: 4,
idle_shutdown_seconds: 60,
prune_after_seconds: 120,
backup_after_seconds: None,
backup_destination: None,
bootstrap_command: None,
}
}
#[test]
fn test_vm_pool_runtime_config_from_vm_pool_config() {
let temp = tempdir().unwrap();
let config = create_test_vm_pool_config(temp.path().to_path_buf());
let runtime_config = VmPoolRuntimeConfig::from(&config);
assert!(runtime_config.enabled);
assert_eq!(runtime_config.volume_root, temp.path());
assert_eq!(runtime_config.shell, PathBuf::from("/bin/bash"));
assert_eq!(runtime_config.shell_args, vec!["-c".to_string()]);
assert_eq!(runtime_config.max_vms, 4);
assert_eq!(runtime_config.idle_shutdown, Duration::from_secs(60));
assert_eq!(runtime_config.prune_after, Duration::from_secs(120));
assert!(runtime_config.backup_after.is_none());
assert!(runtime_config.backup_destination.is_none());
assert!(runtime_config.bootstrap_command.is_none());
}
#[test]
fn test_vm_pool_runtime_config_with_backup() {
let temp = tempdir().unwrap();
let mut config = create_test_vm_pool_config(temp.path().to_path_buf());
config.backup_after_seconds = Some(300);
config.backup_destination = Some(PathBuf::from("/tmp/backups"));
let runtime_config = VmPoolRuntimeConfig::from(&config);
assert_eq!(runtime_config.backup_after, Some(Duration::from_secs(300)));
assert_eq!(
runtime_config.backup_destination,
Some(PathBuf::from("/tmp/backups"))
);
}
#[test]
fn test_vm_pool_runtime_config_with_nix_profile() {
let temp = tempdir().unwrap();
let mut config = create_test_vm_pool_config(temp.path().to_path_buf());
config.nix_profile = Some("github:owner/repo".to_string());
let runtime_config = VmPoolRuntimeConfig::from(&config);
assert_eq!(
runtime_config.nix_profile,
Some("github:owner/repo".to_string())
);
}
#[test]
fn test_vm_pool_runtime_config_with_bootstrap() {
let temp = tempdir().unwrap();
let mut config = create_test_vm_pool_config(temp.path().to_path_buf());
config.bootstrap_command = Some(vec!["echo".to_string(), "hello".to_string()]);
let runtime_config = VmPoolRuntimeConfig::from(&config);
assert_eq!(
runtime_config.bootstrap_command,
Some(vec!["echo".to_string(), "hello".to_string()])
);
}
#[test]
fn test_vm_pool_runtime_config_with_env() {
let temp = tempdir().unwrap();
let mut config = create_test_vm_pool_config(temp.path().to_path_buf());
config
.env
.insert("HOME".to_string(), "/home/test".to_string());
config
.env
.insert("PATH".to_string(), "/usr/bin".to_string());
let runtime_config = VmPoolRuntimeConfig::from(&config);
assert_eq!(runtime_config.env.len(), 2);
assert_eq!(
runtime_config.env.get("HOME"),
Some(&"/home/test".to_string())
);
assert_eq!(
runtime_config.env.get("PATH"),
Some(&"/usr/bin".to_string())
);
}
#[test]
fn test_vm_pool_runtime_config_disabled() {
let temp = tempdir().unwrap();
let mut config = create_test_vm_pool_config(temp.path().to_path_buf());
config.enabled = false;
let runtime_config = VmPoolRuntimeConfig::from(&config);
assert!(!runtime_config.enabled);
}
#[test]
fn test_vm_lifecycle_state_new() {
let state = VmLifecycleState::new();
assert_eq!(state.status, VmStatus::Active);
assert!(state.stopped_at.is_none());
}
#[test]
fn test_vm_status_equality() {
assert_eq!(VmStatus::Active, VmStatus::Active);
assert_eq!(VmStatus::Stopped, VmStatus::Stopped);
assert_ne!(VmStatus::Active, VmStatus::Stopped);
}
#[tokio::test]
async fn test_micro_vm_manager_new_disabled() {
let temp = tempdir().unwrap();
let mut config = create_test_vm_pool_config(temp.path().to_path_buf());
config.enabled = false;
let runtime_config = VmPoolRuntimeConfig::from(&config);
let manager = MicroVmManager::new(runtime_config).unwrap();
assert!(!manager.is_enabled());
assert!(manager.cleanup_handle.is_none());
}
#[tokio::test]
async fn test_micro_vm_manager_new_enabled() {
let temp = tempdir().unwrap();
let config = create_test_vm_pool_config(temp.path().to_path_buf());
let runtime_config = VmPoolRuntimeConfig::from(&config);
let manager = MicroVmManager::new(runtime_config).unwrap();
assert!(manager.is_enabled());
assert!(manager.cleanup_handle.is_some());
}
#[tokio::test]
async fn test_micro_vm_manager_debug_fmt() {
let temp = tempdir().unwrap();
let config = create_test_vm_pool_config(temp.path().to_path_buf());
let runtime_config = VmPoolRuntimeConfig::from(&config);
let manager = MicroVmManager::new(runtime_config).unwrap();
let debug_str = format!("{:?}", manager);
assert!(debug_str.contains("MicroVmManager"));
assert!(debug_str.contains("enabled"));
assert!(debug_str.contains("volume_root"));
assert!(debug_str.contains("active_vms"));
}
#[tokio::test]
async fn test_micro_vm_manager_acquire_disabled() {
let temp = tempdir().unwrap();
let mut config = create_test_vm_pool_config(temp.path().to_path_buf());
config.enabled = false;
let runtime_config = VmPoolRuntimeConfig::from(&config);
let manager = MicroVmManager::new(runtime_config).unwrap();
let result = manager.acquire(Uuid::new_v4()).await;
assert!(result.is_err());
let err = result.err().unwrap();
assert!(err.to_string().contains("disabled"));
}
#[tokio::test]
async fn test_micro_vm_manager_run_maintenance_disabled() {
let temp = tempdir().unwrap();
let mut config = create_test_vm_pool_config(temp.path().to_path_buf());
config.enabled = false;
let runtime_config = VmPoolRuntimeConfig::from(&config);
let manager = MicroVmManager::new(runtime_config).unwrap();
let result = manager.run_maintenance_now().await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_micro_vm_manager_maintenance_empty() {
let temp = tempdir().unwrap();
let config = create_test_vm_pool_config(temp.path().to_path_buf());
let runtime_config = VmPoolRuntimeConfig::from(&config);
let manager = MicroVmManager::new(runtime_config).unwrap();
let result = manager.run_maintenance_now().await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_vm_execution_guard_properties() {
let temp = tempdir().unwrap();
let config = create_test_vm_pool_config(temp.path().to_path_buf());
let runtime_config = VmPoolRuntimeConfig::from(&config);
let manager = MicroVmManager::new(runtime_config).unwrap();
let session_id = Uuid::new_v4();
let guard = manager.acquire(session_id).await.unwrap();
assert_eq!(guard.session_id(), session_id);
assert_eq!(guard.shell_path(), PathBuf::from("/bin/bash"));
assert_eq!(guard.shell_args(), &["-c".to_string()]);
assert!(guard.environment().is_empty());
assert!(guard.workdir().ends_with("volume"));
}
#[tokio::test]
async fn test_micro_vm_manager_acquire_same_session() {
let temp = tempdir().unwrap();
let config = create_test_vm_pool_config(temp.path().to_path_buf());
let runtime_config = VmPoolRuntimeConfig::from(&config);
let manager = MicroVmManager::new(runtime_config).unwrap();
let session_id = Uuid::new_v4();
{
let _guard = manager.acquire(session_id).await.unwrap();
}
{
let guard = manager.acquire(session_id).await.unwrap();
assert_eq!(guard.session_id(), session_id);
}
}
#[tokio::test]
async fn test_micro_vm_manager_max_vms() {
let temp = tempdir().unwrap();
let mut config = create_test_vm_pool_config(temp.path().to_path_buf());
config.max_vms = 1;
let runtime_config = VmPoolRuntimeConfig::from(&config);
let manager = MicroVmManager::new(runtime_config).unwrap();
let session1 = Uuid::new_v4();
let guard1 = manager.acquire(session1).await.unwrap();
let session2 = Uuid::new_v4();
let result = manager.acquire(session2).await;
assert_eq!(guard1.session_id(), session1);
assert!(result.is_err() || result.is_ok());
}
#[test]
fn test_vm_lifecycle_snapshot_creation() {
let state = VmLifecycleState::new();
assert_eq!(state.status, VmStatus::Active);
assert!(state.stopped_at.is_none());
}
#[test]
fn test_maintenance_action_variants() {
let keep = MaintenanceAction::Keep;
let remove = MaintenanceAction::Remove;
match keep {
MaintenanceAction::Keep => {}
MaintenanceAction::Remove => panic!("Expected Keep"),
}
match remove {
MaintenanceAction::Keep => panic!("Expected Remove"),
MaintenanceAction::Remove => {}
}
}
#[test]
fn test_vm_pool_runtime_config_debug() {
let temp = tempdir().unwrap();
let config = create_test_vm_pool_config(temp.path().to_path_buf());
let runtime_config = VmPoolRuntimeConfig::from(&config);
let debug_str = format!("{:?}", runtime_config);
assert!(debug_str.contains("VmPoolRuntimeConfig"));
assert!(debug_str.contains("enabled"));
assert!(debug_str.contains("volume_root"));
}
#[test]
fn test_vm_pool_runtime_config_clone() {
let temp = tempdir().unwrap();
let config = create_test_vm_pool_config(temp.path().to_path_buf());
let runtime_config = VmPoolRuntimeConfig::from(&config);
let cloned = runtime_config.clone();
assert_eq!(runtime_config.enabled, cloned.enabled);
assert_eq!(runtime_config.volume_root, cloned.volume_root);
assert_eq!(runtime_config.max_vms, cloned.max_vms);
}
}