use std::path::PathBuf;
use std::process::Stdio;
use anyhow::{Context, Result};
use async_trait::async_trait;
use tokio::process::Command;
use tracing::{debug, info, warn};
use crate::compute::{ComputeBackend, ContainerConfig, NodeStatus};
const VM_ROOT: &str = "/var/lib/paygress/vm";
const DEFAULT_BASE_IMAGE_URL: &str =
"https://cloud-images.ubuntu.com/jammy/current/jammy-server-cloudimg-amd64.img";
const DEFAULT_BASE_IMAGE_FILE: &str = "jammy-server-cloudimg-amd64.img";
#[derive(Debug, Clone)]
pub struct KvmConfig {
pub base_image_path: PathBuf,
pub base_image_url: String,
pub vm_root: PathBuf,
}
impl Default for KvmConfig {
fn default() -> Self {
Self {
base_image_path: PathBuf::from(VM_ROOT)
.join("base")
.join(DEFAULT_BASE_IMAGE_FILE),
base_image_url: DEFAULT_BASE_IMAGE_URL.to_string(),
vm_root: PathBuf::from(VM_ROOT),
}
}
}
pub struct KvmBackend {
config: KvmConfig,
}
impl KvmBackend {
pub fn new(config: KvmConfig) -> Self {
Self { config }
}
fn vm_dir(&self, id: u32) -> PathBuf {
self.config.vm_root.join(id.to_string())
}
fn disk_path(&self, id: u32) -> PathBuf {
self.vm_dir(id).join("disk.qcow2")
}
fn seed_path(&self, id: u32) -> PathBuf {
self.vm_dir(id).join("seed.iso")
}
fn pidfile_path(&self, id: u32) -> PathBuf {
self.vm_dir(id).join("qemu.pid")
}
fn serial_log_path(&self, id: u32) -> PathBuf {
self.vm_dir(id).join("serial.log")
}
pub async fn check_kvm_available() -> Result<String> {
if !PathBuf::from("/dev/kvm").exists() {
anyhow::bail!(
"/dev/kvm not present; this host does not support KVM. \
Use the Docker or LXD backend, or move to a host with \
nested virtualization enabled."
);
}
let out = Command::new("qemu-system-x86_64")
.arg("--version")
.output()
.await
.context("qemu-system-x86_64 not found on PATH; install qemu-system-x86")?;
if !out.status.success() {
anyhow::bail!(
"qemu-system-x86_64 --version failed: {}",
String::from_utf8_lossy(&out.stderr)
);
}
Ok(String::from_utf8_lossy(&out.stdout)
.lines()
.next()
.unwrap_or("")
.to_string())
}
async fn ensure_base_image(&self) -> Result<()> {
if self.config.base_image_path.exists() {
return Ok(());
}
let parent = self
.config
.base_image_path
.parent()
.context("base_image_path has no parent")?;
tokio::fs::create_dir_all(parent)
.await
.context("create base image directory")?;
info!(
"Downloading base image from {} to {}",
self.config.base_image_url,
self.config.base_image_path.display()
);
let out = Command::new("curl")
.args([
"-fsSL",
"-o",
self.config.base_image_path.to_string_lossy().as_ref(),
&self.config.base_image_url,
])
.output()
.await
.context("invoke curl to fetch base image")?;
if !out.status.success() {
anyhow::bail!(
"curl failed to fetch base image: {}",
String::from_utf8_lossy(&out.stderr)
);
}
Ok(())
}
fn user_data(password: &str) -> String {
format!(
"#cloud-config\n\
ssh_pwauth: true\n\
disable_root: false\n\
chpasswd:\n \
list: |\n \
root:{}\n \
expire: false\n\
# Keep the boot fast: skip waiting for slow services.\n\
timezone: Etc/UTC\n",
password
)
}
fn meta_data(id: u32) -> String {
format!(
"instance-id: paygress-{0}\nlocal-hostname: paygress-{0}\n",
id
)
}
async fn make_seed_iso(&self, id: u32, password: &str) -> Result<()> {
let dir = self.vm_dir(id);
let user_path = dir.join("user-data");
let meta_path = dir.join("meta-data");
tokio::fs::write(&user_path, Self::user_data(password))
.await
.context("write user-data")?;
tokio::fs::write(&meta_path, Self::meta_data(id))
.await
.context("write meta-data")?;
let out = Command::new("genisoimage")
.args([
"-output",
self.seed_path(id).to_string_lossy().as_ref(),
"-volid",
"cidata",
"-joliet",
"-rock",
user_path.to_string_lossy().as_ref(),
meta_path.to_string_lossy().as_ref(),
])
.output()
.await
.context("invoke genisoimage")?;
if !out.status.success() {
anyhow::bail!(
"genisoimage failed: {}",
String::from_utf8_lossy(&out.stderr)
);
}
Ok(())
}
pub fn qemu_argv(&self, config: &ContainerConfig) -> Vec<String> {
let id = config.id;
let cores = config.cpu_cores.max(1);
let mem_mb = config.memory_mb.max(512);
let host_port = config.host_port.unwrap_or(0);
let mut hostfwds = vec![format!("hostfwd=tcp::{}-:22", host_port)];
for p in &config.template_ports {
hostfwds.push(format!(
"hostfwd={}::{}-:{}",
p.protocol, p.host_port, p.container_port
));
}
let netdev = format!("user,id=net0,{}", hostfwds.join(","));
vec![
"-enable-kvm".to_string(),
"-cpu".to_string(),
"host".to_string(),
"-machine".to_string(),
"type=q35,accel=kvm".to_string(),
"-smp".to_string(),
cores.to_string(),
"-m".to_string(),
mem_mb.to_string(),
"-drive".to_string(),
format!(
"file={},if=virtio,format=qcow2",
self.disk_path(id).display()
),
"-drive".to_string(),
format!(
"file={},if=virtio,format=raw,readonly=on",
self.seed_path(id).display()
),
"-netdev".to_string(),
netdev,
"-device".to_string(),
"virtio-net-pci,netdev=net0".to_string(),
"-daemonize".to_string(),
"-pidfile".to_string(),
self.pidfile_path(id).to_string_lossy().to_string(),
"-nographic".to_string(),
"-serial".to_string(),
format!("file:{}", self.serial_log_path(id).display()),
]
}
async fn create_overlay_disk(&self, id: u32, size_gb: u32) -> Result<()> {
let out = Command::new("qemu-img")
.args([
"create",
"-f",
"qcow2",
"-b",
self.config.base_image_path.to_string_lossy().as_ref(),
"-F",
"qcow2",
self.disk_path(id).to_string_lossy().as_ref(),
&format!("{}G", size_gb.max(5)),
])
.output()
.await
.context("invoke qemu-img create")?;
if !out.status.success() {
anyhow::bail!(
"qemu-img create failed: {}",
String::from_utf8_lossy(&out.stderr)
);
}
Ok(())
}
async fn read_pid(&self, id: u32) -> Option<i32> {
let p = self.pidfile_path(id);
let bytes = tokio::fs::read_to_string(&p).await.ok()?;
bytes.trim().parse().ok()
}
}
#[async_trait]
impl ComputeBackend for KvmBackend {
async fn find_available_id(&self, range_start: u32, range_end: u32) -> Result<u32> {
let mut used = std::collections::HashSet::new();
if let Ok(mut entries) = tokio::fs::read_dir(&self.config.vm_root).await {
while let Ok(Some(entry)) = entries.next_entry().await {
if let Some(name) = entry.file_name().to_str() {
if let Ok(n) = name.parse::<u32>() {
used.insert(n);
}
}
}
}
for id in range_start..=range_end {
if !used.contains(&id) {
return Ok(id);
}
}
anyhow::bail!(
"no available VM id in range {}..={}",
range_start,
range_end
);
}
async fn create_container(&self, config: &ContainerConfig) -> Result<String> {
let id = config.id;
info!(
"Provisioning KVM VM: id={} cores={} mem={}MB disk={}GB",
id, config.cpu_cores, config.memory_mb, config.storage_gb
);
self.ensure_base_image().await?;
let dir = self.vm_dir(id);
tokio::fs::create_dir_all(&dir)
.await
.context("create vm directory")?;
self.create_overlay_disk(id, config.storage_gb)
.await
.context("create overlay disk")?;
self.make_seed_iso(id, &config.password)
.await
.context("build cloud-init seed iso")?;
let argv = self.qemu_argv(config);
debug!("qemu argv: {:?}", argv);
let arg_refs: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
let out = Command::new("qemu-system-x86_64")
.args(&arg_refs)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.output()
.await
.context("invoke qemu-system-x86_64")?;
if !out.status.success() {
anyhow::bail!(
"qemu-system-x86_64 failed: {}",
String::from_utf8_lossy(&out.stderr)
);
}
let pid = self
.read_pid(id)
.await
.context("qemu daemonized but pidfile missing — boot failed before pidfile write?")?;
info!("KVM VM id={} live (pid {})", id, pid);
Ok(format!("paygress-vm-{}", id))
}
async fn start_container(&self, _id: u32) -> Result<()> {
Ok(())
}
async fn stop_container(&self, id: u32) -> Result<()> {
if let Some(pid) = self.read_pid(id).await {
let _ = Command::new("kill")
.args(["-TERM", &pid.to_string()])
.status()
.await;
}
Ok(())
}
async fn delete_container(&self, id: u32) -> Result<()> {
let _ = self.stop_container(id).await;
if let Some(pid) = self.read_pid(id).await {
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
let _ = Command::new("kill")
.args(["-KILL", &pid.to_string()])
.status()
.await;
}
let dir = self.vm_dir(id);
if dir.exists() {
if let Err(e) = tokio::fs::remove_dir_all(&dir).await {
warn!("remove {} non-fatal: {}", dir.display(), e);
}
}
Ok(())
}
async fn get_node_status(&self) -> Result<NodeStatus> {
Ok(NodeStatus {
cpu_usage: 0.0,
memory_used: 0,
memory_total: 0,
disk_used: 0,
disk_total: 0,
})
}
async fn get_container_ip(&self, _id: u32) -> Result<Option<String>> {
Ok(None)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::compute::PortMapping;
fn cfg(id: u32) -> ContainerConfig {
ContainerConfig {
id,
name: format!("paygress-vm-{}", id),
image: String::new(), cpu_cores: 2,
memory_mb: 2048,
storage_gb: 10,
password: "secret".to_string(),
ssh_key: None,
host_port: Some(31000),
template_ports: vec![PortMapping {
host_port: 18789,
container_port: 18789,
protocol: "tcp",
}],
template_env: Default::default(),
extra_runtime_args: vec![],
data_path: None,
volume_encryption_key: None,
}
}
#[test]
fn qemu_argv_includes_kvm_acceleration_and_cpu_host() {
let backend = KvmBackend::new(KvmConfig::default());
let argv = backend.qemu_argv(&cfg(42));
assert!(argv.iter().any(|a| a == "-enable-kvm"));
let cpu_idx = argv.iter().position(|a| a == "-cpu").unwrap();
assert_eq!(argv[cpu_idx + 1], "host");
}
#[test]
fn qemu_argv_forwards_ssh_and_template_ports() {
let backend = KvmBackend::new(KvmConfig::default());
let argv = backend.qemu_argv(&cfg(42));
let netdev = argv
.iter()
.position(|a| a == "-netdev")
.map(|i| argv[i + 1].clone())
.unwrap();
assert!(
netdev.contains("hostfwd=tcp::31000-:22"),
"ssh hostfwd missing in: {netdev}"
);
assert!(
netdev.contains("hostfwd=tcp::18789-:18789"),
"template hostfwd missing in: {netdev}"
);
}
#[test]
fn qemu_argv_pidfile_and_disk_paths_are_id_scoped() {
let backend = KvmBackend::new(KvmConfig::default());
let argv = backend.qemu_argv(&cfg(7));
let pidfile_idx = argv.iter().position(|a| a == "-pidfile").unwrap();
assert!(argv[pidfile_idx + 1].contains("/7/qemu.pid"));
let drives: Vec<&String> = argv
.iter()
.enumerate()
.filter(|(i, a)| *a == "-drive" && *i + 1 < argv.len())
.map(|(i, _)| &argv[i + 1])
.collect();
assert!(drives.iter().any(|d| d.contains("/7/disk.qcow2")));
assert!(drives.iter().any(|d| d.contains("/7/seed.iso")));
}
#[test]
fn qemu_argv_memory_floor() {
let backend = KvmBackend::new(KvmConfig::default());
let mut tiny = cfg(1);
tiny.memory_mb = 64; let argv = backend.qemu_argv(&tiny);
let m_idx = argv.iter().position(|a| a == "-m").unwrap();
assert_eq!(argv[m_idx + 1], "512", "must clamp to 512 MB minimum");
}
#[test]
fn paths_are_id_scoped_and_under_vm_root() {
let backend = KvmBackend::new(KvmConfig::default());
for (a, b) in [(1u32, 2u32), (10, 20), (999, 1000)] {
assert_ne!(backend.vm_dir(a), backend.vm_dir(b));
assert_ne!(backend.disk_path(a), backend.disk_path(b));
assert!(backend.vm_dir(a).starts_with(VM_ROOT));
}
}
#[test]
fn user_data_includes_password_and_enables_pwauth() {
let ud = KvmBackend::user_data("hunter2");
assert!(ud.contains("ssh_pwauth: true"));
assert!(ud.contains("root:hunter2"));
}
}