use std::collections::VecDeque;
use std::io::{Read, Write};
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::os::unix::net::UnixStream;
use std::path::{Path, PathBuf};
use std::process::{Child, Command, Stdio};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use std::thread::JoinHandle;
use std::time::{Duration, Instant};
use crate::assets::AssetPaths;
use crate::vmm::pool::{PoolClientError, WarmPool, WarmPoolError};
use crate::vmm::resources::VmResources;
use crate::vmm::runner::RunOptions;
#[non_exhaustive]
pub enum Error {
Image {
msg: String,
source: Option<Box<dyn std::error::Error + Send + Sync>>,
},
Vm {
msg: String,
source: Option<Box<dyn std::error::Error + Send + Sync>>,
},
Assets {
msg: String,
source: Option<Box<dyn std::error::Error + Send + Sync>>,
},
Io(std::io::Error),
Network {
msg: String,
source: Option<Box<dyn std::error::Error + Send + Sync>>,
},
CacheMiss {
msg: String,
},
CacheInvalid {
msg: String,
},
Bake {
msg: String,
source: Option<Box<dyn std::error::Error + Send + Sync>>,
},
}
impl Error {
pub(crate) fn image_msg(msg: impl Into<String>) -> Self {
Error::Image { msg: msg.into(), source: None }
}
pub(crate) fn vm_msg(msg: impl Into<String>) -> Self {
Error::Vm { msg: msg.into(), source: None }
}
pub(crate) fn assets_msg(msg: impl Into<String>) -> Self {
Error::Assets { msg: msg.into(), source: None }
}
pub(crate) fn network_msg(msg: impl Into<String>) -> Self {
Error::Network { msg: msg.into(), source: None }
}
pub(crate) fn bake_msg(msg: impl Into<String>) -> Self {
Error::Bake { msg: msg.into(), source: None }
}
pub(crate) fn cache_miss(msg: impl Into<String>) -> Self {
Error::CacheMiss { msg: msg.into() }
}
pub(crate) fn cache_invalid(msg: impl Into<String>) -> Self {
Error::CacheInvalid { msg: msg.into() }
}
}
impl std::fmt::Debug for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Error::Image { msg, source } => f
.debug_struct("Image")
.field("msg", msg)
.field("source", source)
.finish(),
Error::Vm { msg, source } => f
.debug_struct("Vm")
.field("msg", msg)
.field("source", source)
.finish(),
Error::Assets { msg, source } => f
.debug_struct("Assets")
.field("msg", msg)
.field("source", source)
.finish(),
Error::Io(e) => f.debug_tuple("Io").field(e).finish(),
Error::Network { msg, source } => f
.debug_struct("Network")
.field("msg", msg)
.field("source", source)
.finish(),
Error::CacheMiss { msg } => f.debug_struct("CacheMiss").field("msg", msg).finish(),
Error::CacheInvalid { msg } => {
f.debug_struct("CacheInvalid").field("msg", msg).finish()
}
Error::Bake { msg, source } => f
.debug_struct("Bake")
.field("msg", msg)
.field("source", source)
.finish(),
}
}
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Error::Image { msg, .. } => write!(f, "image: {msg}"),
Error::Vm { msg, .. } => write!(f, "vm: {msg}"),
Error::Assets { msg, .. } => write!(f, "assets: {msg}"),
Error::Io(e) => write!(f, "io: {e}"),
Error::Network { msg, .. } => write!(f, "network: {msg}"),
Error::CacheMiss { msg } => write!(f, "cache miss: {msg}"),
Error::CacheInvalid { msg } => write!(f, "cache invalid: {msg}"),
Error::Bake { msg, .. } => write!(f, "bake: {msg}"),
}
}
}
impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Error::Image { source, .. }
| Error::Vm { source, .. }
| Error::Assets { source, .. }
| Error::Network { source, .. }
| Error::Bake { source, .. } => {
source.as_ref().map(|s| s.as_ref() as &(dyn std::error::Error + 'static))
}
Error::Io(e) => Some(e),
Error::CacheMiss { .. } | Error::CacheInvalid { .. } => None,
}
}
}
impl From<std::io::Error> for Error {
fn from(e: std::io::Error) -> Self {
Error::Io(e)
}
}
impl From<WarmPoolError> for Error {
fn from(e: WarmPoolError) -> Self {
Error::Vm {
msg: e.to_string(),
source: Some(Box::new(e)),
}
}
}
impl From<PoolClientError> for Error {
fn from(e: PoolClientError) -> Self {
Error::Vm {
msg: e.to_string(),
source: Some(Box::new(e)),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum PullPolicy {
Always,
Missing,
Never,
}
impl Default for PullPolicy {
fn default() -> Self {
Self::Missing
}
}
impl PullPolicy {
fn as_bake_str(self) -> &'static str {
match self {
Self::Always => "always",
Self::Missing => "missing",
Self::Never => "never",
}
}
}
#[derive(Debug, Clone)]
pub struct Image {
snapshot_path: PathBuf,
pub(crate) memory_mib: u32,
pub(crate) vcpus: u32,
pub(crate) layers: Vec<PathBuf>,
pub(crate) delta_squashfs: Option<PathBuf>,
pub(crate) bundled_kernel: Option<PathBuf>,
pub(crate) hidden_pool: std::sync::OnceLock<Arc<HiddenPool>>,
}
#[doc(hidden)]
pub struct HiddenPool {
state: Mutex<PoolState>,
available: Condvar,
socks_dir: PathBuf,
shutting_down: AtomicBool,
replenisher: Option<JoinHandle<()>>,
spawn_cfg: Arc<SpawnConfig>,
}
struct PoolState {
idle: VecDeque<Worker>,
alive: usize,
target: usize,
}
struct Worker {
child: Child,
vsock_mux_path: PathBuf,
vsock_exec_path: PathBuf,
}
impl Worker {
fn kill(&mut self) {
let _ = self.child.kill();
let _ = self.child.wait();
let _ = std::fs::remove_file(&self.vsock_mux_path);
let _ = std::fs::remove_file(&self.vsock_exec_path);
let mut h = self.vsock_mux_path.clone();
h.set_extension("handoff");
let _ = std::fs::remove_file(&h);
}
}
struct SpawnConfig {
worker_bin: PathBuf,
snapshot_path: PathBuf,
layers: Vec<PathBuf>,
delta_squashfs: Option<PathBuf>,
memory_mib: u32,
vcpus: u32,
socks_dir: PathBuf,
name_prefix: String,
spawn_timeout: Duration,
}
impl SpawnConfig {
fn spawn_one(&self) -> Result<Worker, Error> {
let suffix = (unique_suffix() & 0xffff_ffff) as u32;
let vsock_mux_path = self
.socks_dir
.join(format!("{}-{:08x}.sock", self.name_prefix, suffix));
let vsock_exec_path = {
let mut p = vsock_mux_path.clone();
let mut name = p.file_name().unwrap().to_owned();
name.push("-exec");
p.set_file_name(name);
p
};
let _ = std::fs::remove_file(&vsock_mux_path);
let _ = std::fs::remove_file(&vsock_exec_path);
let mut cmd = Command::new(&self.worker_bin);
for layer in &self.layers {
cmd.arg("--virtio-blk").arg(layer);
}
if let Some(delta) = &self.delta_squashfs {
cmd.arg("--virtio-blk").arg(delta);
}
cmd.arg("--memory").arg(self.memory_mib.to_string());
cmd.arg("--vcpus").arg(self.vcpus.to_string());
cmd.arg("--restore-from").arg(&self.snapshot_path);
cmd.arg("--cow-restore");
cmd.arg("--vsock-mux").arg(&vsock_mux_path);
cmd.arg("--vsock-exec").arg(&vsock_exec_path);
let log_to_stdio = std::env::var("SUPERMACHINE_WORKER_LOG")
.map(|v| v == "1" || v == "true")
.unwrap_or(false);
if !log_to_stdio {
cmd.stdout(Stdio::null()).stderr(Stdio::null());
}
let child = cmd
.spawn()
.map_err(|e| Error::vm_msg(format!("spawn worker {}: {e}", self.worker_bin.display())))?;
let deadline = Instant::now() + self.spawn_timeout;
while !vsock_mux_path.exists() {
if Instant::now() > deadline {
let mut w = Worker {
child,
vsock_mux_path: vsock_mux_path.clone(),
vsock_exec_path: vsock_exec_path.clone(),
};
w.kill();
return Err(Error::vm_msg(format!(
"worker spawn: vsock socket {} did not appear within {:?}",
vsock_mux_path.display(),
self.spawn_timeout
)));
}
std::thread::sleep(Duration::from_millis(10));
}
Ok(Worker {
child,
vsock_mux_path,
vsock_exec_path,
})
}
}
impl std::fmt::Debug for HiddenPool {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s = self.state.lock().ok();
f.debug_struct("HiddenPool")
.field("socks_dir", &self.socks_dir)
.field(
"alive",
&s.as_ref().map(|s| s.alive).unwrap_or(usize::MAX),
)
.field(
"idle",
&s.as_ref().map(|s| s.idle.len()).unwrap_or(usize::MAX),
)
.finish()
}
}
impl Drop for HiddenPool {
fn drop(&mut self) {
self.shutting_down.store(true, Ordering::SeqCst);
self.available.notify_all();
if let Ok(mut s) = self.state.lock() {
while let Some(mut w) = s.idle.pop_front() {
w.kill();
s.alive = s.alive.saturating_sub(1);
}
}
if let Some(h) = self.replenisher.take() {
let _ = h.join();
}
let _ = std::fs::remove_dir_all(&self.socks_dir);
}
}
impl HiddenPool {
fn acquire(&self) -> Result<Worker, Error> {
let mut state = self
.state
.lock()
.map_err(|_| Error::vm_msg("pool mutex poisoned".to_owned()))?;
loop {
if let Some(w) = state.idle.pop_front() {
return Ok(w);
}
if self.shutting_down.load(Ordering::SeqCst) {
return Err(Error::vm_msg("pool is shutting down".to_owned()));
}
state = self
.available
.wait(state)
.map_err(|_| Error::vm_msg("pool condvar poisoned".to_owned()))?;
}
}
fn release(&self, mut worker: Worker) {
worker.kill();
if let Ok(mut s) = self.state.lock() {
s.alive = s.alive.saturating_sub(1);
}
self.available.notify_all();
}
}
impl Image {
pub fn from_snapshot(path: impl Into<PathBuf>) -> Result<Self, Error> {
let path = path.into();
let (snapshot_path, metadata_path) = if path.is_dir() {
(path.join("restore.snap"), path.join("metadata.json"))
} else if path.is_file() {
let parent = path.parent().ok_or_else(|| {
Error::image_msg(format!("snapshot path has no parent dir: {}", path.display()))
})?;
(path.clone(), parent.join("metadata.json"))
} else {
return Err(Error::image_msg(format!(
"snapshot path not found: {}",
path.display()
)));
};
if !snapshot_path.is_file() {
return Err(Error::image_msg(format!(
"snapshot file not found: {}",
snapshot_path.display()
)));
}
if !metadata_path.is_file() {
return Err(Error::image_msg(format!(
"metadata.json not found alongside snapshot at {}",
metadata_path.display()
)));
}
let meta_text = std::fs::read_to_string(&metadata_path)
.map_err(|e| Error::image_msg(format!("read {}: {e}", metadata_path.display())))?;
let meta: serde_json::Value = serde_json::from_str(&meta_text)
.map_err(|e| Error::image_msg(format!("parse {}: {e}", metadata_path.display())))?;
let memory_mib = meta
.get("memory_mib")
.and_then(|v| v.as_u64())
.map(|v| v as u32)
.unwrap_or(256);
let vcpus = meta
.get("vcpus")
.and_then(|v| v.as_u64())
.map(|v| v as u32)
.unwrap_or(1);
let metadata_dir = metadata_path
.parent()
.map(Path::to_path_buf)
.unwrap_or_else(|| PathBuf::from("."));
let resolve_path = |s: &str| -> PathBuf {
let p = PathBuf::from(s);
if p.is_absolute() {
p
} else {
metadata_dir.join(p)
}
};
let layers: Vec<PathBuf> = meta
.get("layers")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|x| x.as_str().map(&resolve_path))
.collect()
})
.unwrap_or_default();
let delta_squashfs = meta
.get("delta_squashfs")
.and_then(|v| v.as_str())
.map(&resolve_path);
let bundled_kernel = {
let cand = metadata_dir.join("kernel");
if cand.is_file() {
Some(cand)
} else {
None
}
};
Ok(Self {
snapshot_path,
memory_mib,
vcpus,
layers,
delta_squashfs,
bundled_kernel,
hidden_pool: std::sync::OnceLock::new(),
})
}
pub fn from_oci(image_ref: &str) -> Result<Self, Error> {
Self::from_oci_with_policy(image_ref, PullPolicy::default())
}
pub fn from_oci_with_policy(
image_ref: &str,
policy: PullPolicy,
) -> Result<Self, Error> {
let snapshots_dir = default_snapshots_dir();
Self::from_oci_to_dir(image_ref, policy, &snapshots_dir, None)
}
pub fn from_oci_to_dir(
image_ref: &str,
policy: PullPolicy,
snapshots_dir: &Path,
name: Option<&str>,
) -> Result<Self, Error> {
let derived = name
.map(|s| s.to_owned())
.unwrap_or_else(|| crate::bake::snapshot_name_for_image(image_ref));
let snap_dir = snapshots_dir.join(&derived);
let cache_loadable = Self::from_snapshot(&snap_dir).is_ok();
match policy {
PullPolicy::Never => {
if cache_loadable {
return Self::from_snapshot(&snap_dir);
}
let restore_snap = snap_dir.join("restore.snap");
if restore_snap.is_file() {
return Err(Error::cache_invalid(format!(
"snapshot present at {} but not loadable on this binary; \
rebake required (PullPolicy::Never won't auto-rebake)",
snap_dir.display()
)));
}
return Err(Error::cache_miss(format!(
"no cached snapshot for {image_ref} at {} (PullPolicy::Never)",
snap_dir.display()
)));
}
PullPolicy::Missing if cache_loadable => {
return Self::from_snapshot(&snap_dir);
}
_ => {}
}
let root = repo_root_for_bake()?;
let request = crate::bake::BakeRequest {
image: image_ref.to_owned(),
name: name.map(|s| s.to_owned()),
runtime: "supermachine".to_owned(),
guest_port: 80,
memory_mib: 256,
vcpus: 1,
pull_policy: policy.as_bake_str().to_owned(),
snapshots_dir: snapshots_dir.to_path_buf(),
cmd_override: None,
extra_args: Vec::new(),
};
let bake_t0 = std::time::Instant::now();
crate::bake::run_push(&request, bake_t0, &root).map_err(map_bake_error)?;
Self::from_snapshot(&snap_dir)
}
pub fn builder(image_ref: impl Into<String>) -> OciImageBuilder {
OciImageBuilder::new(image_ref)
}
pub fn ensure_baked<F>(
name: impl Into<String>,
image_ref: impl Into<String>,
configure: F,
) -> Result<Image, Error>
where
F: FnOnce(OciImageBuilder) -> OciImageBuilder,
{
let builder = configure(
OciImageBuilder::new(image_ref).with_name(name),
);
builder.build()
}
pub fn snapshot_path(&self) -> &Path {
&self.snapshot_path
}
pub fn memory_mib(&self) -> u32 {
self.memory_mib
}
pub fn vcpus(&self) -> u32 {
self.vcpus
}
pub fn start(&self, config: &VmConfig) -> Result<Vm, Error> {
Vm::start(self, config)
}
pub fn acquire(&self) -> Result<PooledVm<'_>, Error> {
self.acquire_with(&VmConfig::new())
}
pub fn acquire_with(&self, config: &VmConfig) -> Result<PooledVm<'_>, Error> {
let pool_arc = self.ensure_pool(config)?;
let worker = pool_arc.acquire()?;
let vm = Vm {
pool: None,
vsock_mux_path: worker.vsock_mux_path.clone(),
vsock_exec_path: worker.vsock_exec_path.clone(),
own_vsock_mux_dir: None,
skip_cleanup: true,
image_meta: None,
};
Ok(PooledVm {
vm: Some(vm),
worker: Some(worker),
pool_arc: Arc::clone(pool_arc),
_image: std::marker::PhantomData,
})
}
fn ensure_pool(
&self,
config: &VmConfig,
) -> Result<&Arc<HiddenPool>, Error> {
if let Some(p) = self.hidden_pool.get() {
return Ok(p);
}
let target = config.pool_warm.unwrap_or_else(|| {
std::env::var("SUPERMACHINE_POOL_WARM")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(1usize)
}).max(1);
#[cfg(target_os = "macos")]
let worker_bin = crate::codesign::locate_worker_bin().ok_or_else(|| {
Error::assets_msg(
"supermachine-worker binary not found (looked for sibling of \
current_exe and target/release/supermachine-worker). Set \
SUPERMACHINE_WORKER_BIN if you have it elsewhere."
.to_owned(),
)
})?;
#[cfg(not(target_os = "macos"))]
let worker_bin: PathBuf = std::env::var_os("SUPERMACHINE_WORKER_BIN")
.map(PathBuf::from)
.ok_or_else(|| {
Error::assets_msg(
"SUPERMACHINE_WORKER_BIN must be set on this platform".to_owned(),
)
})?;
#[cfg(target_os = "macos")]
{
let _ = crate::codesign::ensure_worker_signed(&worker_bin);
}
let socks_dir = match &config.vsock_mux_dir {
Some(d) => d.clone(),
None => PathBuf::from(format!(
"/tmp/sm-pool-{}",
std::process::id(),
)),
};
std::fs::create_dir_all(&socks_dir).map_err(Error::Io)?;
let memory_mib = config.memory_mib.unwrap_or(self.memory_mib);
let vcpus = config.vcpus.unwrap_or(self.vcpus);
let spawn_timeout = config
.restore_timeout
.unwrap_or_else(|| Duration::from_secs(30));
let name_prefix = "w".to_owned();
let spawn_cfg = Arc::new(SpawnConfig {
worker_bin,
snapshot_path: self.snapshot_path.clone(),
layers: self.layers.clone(),
delta_squashfs: self.delta_squashfs.clone(),
memory_mib,
vcpus,
socks_dir: socks_dir.clone(),
name_prefix,
spawn_timeout,
});
let mut idle = VecDeque::with_capacity(target);
for _ in 0..target {
idle.push_back(spawn_cfg.spawn_one()?);
}
let pool = Arc::new(HiddenPool {
state: Mutex::new(PoolState {
idle,
alive: target,
target,
}),
available: Condvar::new(),
socks_dir,
shutting_down: AtomicBool::new(false),
replenisher: None,
spawn_cfg: Arc::clone(&spawn_cfg),
});
let pool_weak = Arc::downgrade(&pool);
let h = std::thread::Builder::new()
.name("supermachine-pool-replenish".into())
.spawn(move || replenisher_loop(pool_weak))
.map_err(|e| Error::vm_msg(format!("spawn replenisher thread: {e}")))?;
REPLENISHER_HANDLES.lock().unwrap().insert(
Arc::as_ptr(&pool) as usize,
h,
);
let _ = self.hidden_pool.set(pool);
Ok(self
.hidden_pool
.get()
.expect("hidden pool was just initialized"))
}
}
fn replenisher_loop(pool: std::sync::Weak<HiddenPool>) {
loop {
let Some(p) = pool.upgrade() else { return };
if p.shutting_down.load(Ordering::SeqCst) {
return;
}
let need_more = {
let s = match p.state.lock() {
Ok(s) => s,
Err(_) => return,
};
s.alive < s.target
};
if !need_more {
let s = match p.state.lock() {
Ok(s) => s,
Err(_) => return,
};
let _ = p.available.wait_timeout(s, Duration::from_millis(200));
continue;
}
let spawned = p.spawn_cfg.spawn_one();
match spawned {
Ok(w) => {
if let Ok(mut s) = p.state.lock() {
s.idle.push_back(w);
s.alive += 1;
p.available.notify_all();
}
}
Err(_) => {
std::thread::sleep(Duration::from_millis(500));
}
}
}
}
static REPLENISHER_HANDLES: std::sync::LazyLock<
std::sync::Mutex<std::collections::HashMap<usize, std::thread::JoinHandle<()>>>,
> = std::sync::LazyLock::new(|| std::sync::Mutex::new(std::collections::HashMap::new()));
pub struct PooledVm<'a> {
vm: Option<Vm>,
worker: Option<Worker>,
pool_arc: Arc<HiddenPool>,
_image: std::marker::PhantomData<&'a Image>,
}
impl<'a> std::ops::Deref for PooledVm<'a> {
type Target = Vm;
fn deref(&self) -> &Vm {
self.vm.as_ref().expect("PooledVm used after drop")
}
}
impl<'a> std::ops::DerefMut for PooledVm<'a> {
fn deref_mut(&mut self) -> &mut Vm {
self.vm.as_mut().expect("PooledVm used after drop")
}
}
impl<'a> Drop for PooledVm<'a> {
fn drop(&mut self) {
let _ = self.vm.take();
if let Some(worker) = self.worker.take() {
self.pool_arc.release(worker);
}
}
}
pub struct OciImageBuilder {
image: String,
name: Option<String>,
pull_policy: PullPolicy,
memory_mib: Option<u32>,
vcpus: Option<u32>,
guest_port: Option<u16>,
cmd: Option<Vec<String>>,
envs: Vec<(String, String)>,
snapshots_dir: Option<PathBuf>,
}
impl OciImageBuilder {
pub fn new(image_ref: impl Into<String>) -> Self {
Self {
image: image_ref.into(),
name: None,
pull_policy: PullPolicy::default(),
memory_mib: None,
vcpus: None,
guest_port: None,
cmd: None,
envs: Vec::new(),
snapshots_dir: None,
}
}
pub fn with_vcpus(mut self, vcpus: u32) -> Self {
self.vcpus = Some(vcpus);
self
}
pub fn with_name(mut self, name: impl Into<String>) -> Self {
self.name = Some(name.into());
self
}
pub fn with_pull_policy(mut self, policy: PullPolicy) -> Self {
self.pull_policy = policy;
self
}
pub fn with_memory_mib(mut self, mib: u32) -> Self {
self.memory_mib = Some(mib);
self
}
pub fn with_guest_port(mut self, port: u16) -> Self {
self.guest_port = Some(port);
self
}
pub fn with_cmd<I, S>(mut self, cmd: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
self.cmd = Some(cmd.into_iter().map(Into::into).collect());
self
}
pub fn with_env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.envs.push((key.into(), value.into()));
self
}
pub fn with_snapshots_dir(mut self, dir: impl Into<PathBuf>) -> Self {
self.snapshots_dir = Some(dir.into());
self
}
pub fn build(self) -> Result<Image, Error> {
let snapshots_dir = self
.snapshots_dir
.unwrap_or_else(default_snapshots_dir);
let derived_name = self
.name
.clone()
.unwrap_or_else(|| crate::bake::snapshot_name_for_image(&self.image));
let snap_dir = snapshots_dir.join(&derived_name);
let cache_loadable = Image::from_snapshot(&snap_dir).is_ok();
match self.pull_policy {
PullPolicy::Never => {
if cache_loadable {
return Image::from_snapshot(&snap_dir);
}
let restore_snap = snap_dir.join("restore.snap");
if restore_snap.is_file() {
return Err(Error::cache_invalid(format!(
"snapshot present at {} but not loadable on this binary; \
rebake required (PullPolicy::Never won't auto-rebake)",
snap_dir.display()
)));
}
return Err(Error::cache_miss(format!(
"no cached snapshot for {} at {} (PullPolicy::Never)",
self.image,
snap_dir.display()
)));
}
_ => {}
}
let mut extra_args: Vec<String> = Vec::new();
for (k, v) in &self.envs {
extra_args.push("--env".to_owned());
extra_args.push(format!("{k}={v}"));
}
let cmd_override = match &self.cmd {
Some(argv) => Some(
serde_json::to_string(argv)
.map_err(|e| Error::bake_msg(format!("encode cmd: {e}")))?,
),
None => None,
};
let root = repo_root_for_bake()?;
let request = crate::bake::BakeRequest {
image: self.image.clone(),
name: self.name.clone(),
runtime: "supermachine".to_owned(),
guest_port: self.guest_port.unwrap_or(80),
memory_mib: self.memory_mib.unwrap_or(256),
vcpus: self.vcpus.unwrap_or(1),
pull_policy: self.pull_policy.as_bake_str().to_owned(),
snapshots_dir: snapshots_dir.clone(),
cmd_override,
extra_args,
};
let bake_t0 = std::time::Instant::now();
crate::bake::run_push(&request, bake_t0, &root).map_err(map_bake_error)?;
Image::from_snapshot(&snap_dir)
}
}
fn default_snapshots_dir() -> PathBuf {
if let Some(d) = std::env::var_os("SUPERMACHINE_SNAPSHOTS") {
return PathBuf::from(d);
}
let home = std::env::var_os("HOME")
.map(PathBuf::from)
.unwrap_or_else(|| PathBuf::from("."));
home.join(".local/supermachine-snapshots")
}
fn repo_root_for_bake() -> Result<PathBuf, Error> {
if let Some(root) = std::env::var_os("SUPERMACHINE_ROOT") {
return Ok(PathBuf::from(root));
}
let exe = std::env::current_exe()
.map_err(|e| Error::bake_msg(format!("current_exe: {e}")))?;
for ancestor in exe.ancestors() {
if ancestor.join("tools/supermachine-push").is_file() {
return Ok(ancestor.to_path_buf());
}
if ancestor.join("share/supermachine/kernel").is_file() {
return Ok(ancestor.to_path_buf());
}
}
std::env::current_dir().map_err(|e| Error::bake_msg(format!("current_dir: {e}")))
}
fn map_bake_error(msg: String) -> Error {
let lc = msg.to_ascii_lowercase();
if lc.contains("registry") || lc.contains("manifest") || lc.contains("docker pull")
|| lc.contains("auth")
{
Error::network_msg(msg)
} else if lc.contains("snapshot") && lc.contains("timeout") {
Error::bake_msg(msg)
} else if lc.contains("listener readiness") {
Error::bake_msg(msg)
} else {
Error::bake_msg(msg)
}
}
#[derive(Debug, Clone, Default)]
pub struct VmConfig {
memory_mib: Option<u32>,
vcpus: Option<u32>,
assets: Option<AssetPaths>,
vsock_mux_dir: Option<PathBuf>,
restore_timeout: Option<Duration>,
pub(crate) pool_warm: Option<usize>,
}
impl VmConfig {
pub fn new() -> Self {
Self::default()
}
pub fn with_memory_mib(mut self, mib: u32) -> Self {
self.memory_mib = Some(mib);
self
}
pub fn with_vcpus(mut self, vcpus: u32) -> Self {
self.vcpus = Some(vcpus);
self
}
pub fn with_assets(mut self, assets: AssetPaths) -> Self {
self.assets = Some(assets);
self
}
pub fn with_vsock_mux_dir(mut self, dir: impl Into<PathBuf>) -> Self {
self.vsock_mux_dir = Some(dir.into());
self
}
pub fn with_restore_timeout(mut self, timeout: Duration) -> Self {
self.restore_timeout = Some(timeout);
self
}
pub fn with_pool_warm(mut self, n: usize) -> Self {
self.pool_warm = Some(n.max(1));
self
}
}
pub struct Vm {
pool: Option<WarmPool>,
vsock_mux_path: PathBuf,
vsock_exec_path: PathBuf,
own_vsock_mux_dir: Option<PathBuf>,
skip_cleanup: bool,
image_meta: Option<Arc<ImageMeta>>,
}
#[derive(Clone, Debug)]
pub(crate) struct ImageMeta {
pub memory_mib: u32,
pub vcpus: u32,
pub layers: Vec<PathBuf>,
pub delta_squashfs: Option<PathBuf>,
}
impl Vm {
pub fn start(image: &Image, config: &VmConfig) -> Result<Vm, Error> {
#[cfg(target_os = "macos")]
if let Err(msg) = crate::codesign::check_self_has_hvf_entitlement() {
return Err(Error::vm_msg(msg));
}
let assets = match &config.assets {
Some(a) => a.clone(),
None => AssetPaths::discover(),
};
let kernel: PathBuf = if let Some(k) = image.bundled_kernel.as_ref() {
k.clone()
} else if let Some(k) = assets.kernel.as_ref() {
k.clone()
} else {
return Err(Error::assets_msg(
"no kernel found: snapshot dir has no bundled kernel and AssetPaths::discover() came up empty; set VmConfig::with_assets() or $SUPERMACHINE_ASSETS_DIR".to_owned(),
));
};
let kernel = kernel.as_path();
let dir = match &config.vsock_mux_dir {
Some(d) => d.clone(),
None => std::env::temp_dir(),
};
let mut own_dir = None;
if !dir.is_dir() {
std::fs::create_dir_all(&dir).map_err(Error::Io)?;
own_dir = Some(dir.clone());
}
let vsock_mux_path = dir.join(format!(
"supermachine-vm-{}-{}.sock",
std::process::id(),
unique_suffix(),
));
let vsock_exec_path = {
let mut p = vsock_mux_path.clone();
let mut name = p.file_name().unwrap().to_owned();
name.push("-exec");
p.set_file_name(name);
p
};
let memory_mib = config.memory_mib.unwrap_or(image.memory_mib);
let vcpus = config.vcpus.unwrap_or(image.vcpus);
let mut resources = VmResources::new()
.with_kernel_path(kernel.to_string_lossy().to_string())
.with_memory_mib(memory_mib as usize)
.with_vcpus(vcpus)
.with_cow_restore(true)
.with_restore(image.snapshot_path.to_string_lossy().to_string())
.with_vsock_mux(vsock_mux_path.to_string_lossy().to_string())
.with_vsock_exec(vsock_exec_path.to_string_lossy().to_string());
for layer in &image.layers {
resources = resources.with_block_device(layer.to_string_lossy().to_string());
}
if let Some(delta) = &image.delta_squashfs {
resources = resources.with_block_device(delta.to_string_lossy().to_string());
}
let options = RunOptions::default();
let pool = WarmPool::start(resources, options).map_err(Error::from)?;
let timeout = config
.restore_timeout
.unwrap_or_else(|| Duration::from_secs(10));
let _ = pool
.restore_timeout(image.snapshot_path.to_string_lossy().to_string(), timeout)
.map_err(Error::from)?;
Ok(Vm {
pool: Some(pool),
vsock_mux_path,
vsock_exec_path,
own_vsock_mux_dir: own_dir,
skip_cleanup: false,
image_meta: Some(Arc::new(ImageMeta {
memory_mib,
vcpus,
layers: image.layers.clone(),
delta_squashfs: image.delta_squashfs.clone(),
})),
})
}
pub fn vsock_path(&self) -> &Path {
&self.vsock_mux_path
}
pub fn exec_path(&self) -> &Path {
&self.vsock_exec_path
}
pub fn exec<I, S>(&self, argv: I) -> std::io::Result<crate::exec::ExecChild>
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
self.exec_builder().argv(argv).spawn()
}
pub fn exec_builder(&self) -> crate::exec::ExecBuilder {
crate::exec::ExecBuilder::new(self.vsock_exec_path.clone())
}
pub fn write_file(&self, path: &str, bytes: &[u8]) -> std::io::Result<()> {
let body = serde_json::json!({
"action": "write_file",
"path": path,
"data_b64": b64_encode(bytes),
});
crate::exec::send_control(&self.vsock_exec_path, &body)
}
pub fn read_file(&self, path: &str) -> std::io::Result<Vec<u8>> {
let body = serde_json::json!({
"action": "read_file",
"path": path,
});
let ack = crate::exec::send_control_with_ack(
&self.vsock_exec_path,
&body,
Some(std::time::Duration::from_secs(30)),
)?;
let data_b64 = ack
.get("data_b64")
.and_then(|v| v.as_str())
.ok_or_else(|| {
std::io::Error::new(
std::io::ErrorKind::InvalidData,
"read_file: agent ack missing data_b64",
)
})?;
b64_decode(data_b64)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
}
pub fn workload_signal(&self, signum: i32) -> std::io::Result<()> {
let body = serde_json::json!({
"action": "signal",
"signum": signum,
});
crate::exec::send_control(&self.vsock_exec_path, &body)
}
pub fn connect(&self) -> std::io::Result<UnixStream> {
UnixStream::connect(&self.vsock_mux_path)
}
pub fn expose_tcp(&self, host_port: u16, _guest_port: u16) -> std::io::Result<TcpForwarder> {
let listener = TcpListener::bind(("127.0.0.1", host_port))?;
let bound = listener.local_addr()?;
listener.set_nonblocking(false)?;
let stop = Arc::new(AtomicBool::new(false));
let stop_thread = stop.clone();
let vsock_path = self.vsock_mux_path.clone();
let handle = std::thread::Builder::new()
.name(format!("supermachine-tcp-{host_port}"))
.spawn(move || {
accept_loop(listener, vsock_path, stop_thread);
})
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
Ok(TcpForwarder {
stop,
handle: Some(handle),
bound,
})
}
pub fn stop(mut self) -> Result<(), Error> {
if let Some(pool) = self.pool.take() {
let _ = pool.shutdown().map_err(Error::from)?;
}
self.cleanup_socket();
Ok(())
}
pub fn snapshot(mut self, dest_dir: impl Into<PathBuf>) -> Result<Image, Error> {
let dest_dir = dest_dir.into();
let meta = self.image_meta.clone().ok_or_else(|| {
Error::vm_msg(
"Vm::snapshot requires an in-process Vm (use image.start, not image.acquire)"
.to_owned(),
)
})?;
let pool = self.pool.as_ref().ok_or_else(|| {
Error::vm_msg("Vm::snapshot: no pool to drive the capture".to_owned())
})?;
std::fs::create_dir_all(&dest_dir).map_err(Error::Io)?;
let snap_path = dest_dir.join("restore.snap");
let _result = pool
.snapshot_timeout(
snap_path.to_string_lossy().to_string(),
Duration::from_secs(60),
)
.map_err(|e| Error::Vm {
msg: format!("snapshot capture failed: {e:?}"),
source: None,
})?;
let metadata = serde_json::json!({
"memory_mib": meta.memory_mib,
"vcpus": meta.vcpus,
"layers": meta
.layers
.iter()
.map(|p| p.to_string_lossy().to_string())
.collect::<Vec<_>>(),
"delta_squashfs": meta
.delta_squashfs
.as_ref()
.map(|p| p.to_string_lossy().to_string()),
"snapshot_base": snap_path.to_string_lossy().to_string(),
"baked_at": chrono_rfc3339_now(),
"source": "Vm::snapshot",
});
std::fs::write(
dest_dir.join("metadata.json"),
serde_json::to_string_pretty(&metadata)
.map_err(|e| Error::vm_msg(format!("metadata serialize: {e}")))?,
)
.map_err(Error::Io)?;
if let Some(pool) = self.pool.take() {
let _ = pool.shutdown();
}
self.cleanup_socket();
self.skip_cleanup = true;
Image::from_snapshot(&dest_dir)
}
fn cleanup_socket(&self) {
let _ = std::fs::remove_file(&self.vsock_mux_path);
let _ = std::fs::remove_file(&self.vsock_exec_path);
if let Some(dir) = &self.own_vsock_mux_dir {
let _ = std::fs::remove_dir(dir);
}
}
}
const B64_ALPHA: &[u8; 64] =
b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
pub(crate) fn b64_encode(bytes: &[u8]) -> String {
let mut out = String::with_capacity((bytes.len() + 2) / 3 * 4);
let mut i = 0;
while i + 3 <= bytes.len() {
let b0 = bytes[i] as u32;
let b1 = bytes[i + 1] as u32;
let b2 = bytes[i + 2] as u32;
let n = (b0 << 16) | (b1 << 8) | b2;
out.push(B64_ALPHA[((n >> 18) & 0x3f) as usize] as char);
out.push(B64_ALPHA[((n >> 12) & 0x3f) as usize] as char);
out.push(B64_ALPHA[((n >> 6) & 0x3f) as usize] as char);
out.push(B64_ALPHA[(n & 0x3f) as usize] as char);
i += 3;
}
let rem = bytes.len() - i;
if rem == 1 {
let b0 = bytes[i] as u32;
let n = b0 << 16;
out.push(B64_ALPHA[((n >> 18) & 0x3f) as usize] as char);
out.push(B64_ALPHA[((n >> 12) & 0x3f) as usize] as char);
out.push('=');
out.push('=');
} else if rem == 2 {
let b0 = bytes[i] as u32;
let b1 = bytes[i + 1] as u32;
let n = (b0 << 16) | (b1 << 8);
out.push(B64_ALPHA[((n >> 18) & 0x3f) as usize] as char);
out.push(B64_ALPHA[((n >> 12) & 0x3f) as usize] as char);
out.push(B64_ALPHA[((n >> 6) & 0x3f) as usize] as char);
out.push('=');
}
out
}
pub(crate) fn b64_decode(s: &str) -> Result<Vec<u8>, String> {
let mut tbl = [255u8; 256];
for (i, &b) in B64_ALPHA.iter().enumerate() {
tbl[b as usize] = i as u8;
}
let bytes: Vec<u8> = s.bytes().filter(|b| !b.is_ascii_whitespace()).collect();
if bytes.len() % 4 != 0 {
return Err(format!("base64 length {} is not a multiple of 4", bytes.len()));
}
let mut out = Vec::with_capacity(bytes.len() / 4 * 3);
for chunk in bytes.chunks_exact(4) {
let v: [u8; 4] = chunk.try_into().unwrap();
let pad = v.iter().filter(|&&b| b == b'=').count();
let mut acc: u32 = 0;
for &b in &v {
let d = if b == b'=' { 0 } else { tbl[b as usize] };
if b != b'=' && d == 255 {
return Err(format!("invalid base64 character {:#x}", b));
}
acc = (acc << 6) | (d as u32);
}
out.push(((acc >> 16) & 0xff) as u8);
if pad < 2 {
out.push(((acc >> 8) & 0xff) as u8);
}
if pad < 1 {
out.push((acc & 0xff) as u8);
}
}
Ok(out)
}
impl Drop for Vm {
fn drop(&mut self) {
if self.skip_cleanup {
return;
}
if let Some(pool) = self.pool.take() {
let _ = pool.shutdown();
}
self.cleanup_socket();
}
}
pub struct TcpForwarder {
stop: Arc<AtomicBool>,
handle: Option<JoinHandle<()>>,
bound: SocketAddr,
}
impl TcpForwarder {
pub fn local_addr(&self) -> SocketAddr {
self.bound
}
pub fn stop(mut self) {
self.shutdown();
}
fn shutdown(&mut self) {
self.stop.store(true, Ordering::SeqCst);
let _ = TcpStream::connect_timeout(&self.bound, Duration::from_millis(200));
if let Some(h) = self.handle.take() {
let _ = h.join();
}
}
}
impl Drop for TcpForwarder {
fn drop(&mut self) {
self.shutdown();
}
}
fn accept_loop(listener: TcpListener, vsock_path: PathBuf, stop: Arc<AtomicBool>) {
for incoming in listener.incoming() {
if stop.load(Ordering::SeqCst) {
break;
}
let tcp = match incoming {
Ok(s) => s,
Err(_) => continue,
};
let vsock = vsock_path.clone();
std::thread::Builder::new()
.name("supermachine-tcp-conn".into())
.spawn(move || {
if let Err(e) = splice_tcp_to_unix(tcp, &vsock) {
eprintln!("supermachine: tcp forward: {e}");
}
})
.ok();
}
}
fn splice_tcp_to_unix(tcp: TcpStream, vsock_path: &Path) -> std::io::Result<()> {
let unix = UnixStream::connect(vsock_path)?;
let tcp_w = tcp.try_clone()?;
let unix_w = unix.try_clone()?;
let t1 = std::thread::Builder::new()
.name("supermachine-tcp-c2g".into())
.spawn(move || {
let _ = pump(tcp, unix_w);
})?;
let t2 = std::thread::Builder::new()
.name("supermachine-tcp-g2c".into())
.spawn(move || {
let _ = pump(unix, tcp_w);
})?;
let _ = t1.join();
let _ = t2.join();
Ok(())
}
fn pump<R, W>(mut r: R, mut w: W) -> std::io::Result<()>
where
R: Read,
W: Write + Shutdownable,
{
let mut buf = [0u8; 16 * 1024];
loop {
let n = match r.read(&mut buf) {
Ok(0) => break,
Ok(n) => n,
Err(e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
Err(e) => return Err(e),
};
if let Err(e) = w.write_all(&buf[..n]) {
return Err(e);
}
}
let _ = w.shutdown_write();
Ok(())
}
trait Shutdownable {
fn shutdown_write(&mut self) -> std::io::Result<()>;
}
impl Shutdownable for TcpStream {
fn shutdown_write(&mut self) -> std::io::Result<()> {
TcpStream::shutdown(self, std::net::Shutdown::Write)
}
}
impl Shutdownable for UnixStream {
fn shutdown_write(&mut self) -> std::io::Result<()> {
UnixStream::shutdown(self, std::net::Shutdown::Write)
}
}
fn unique_suffix() -> u64 {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0);
nanos.wrapping_add(COUNTER.fetch_add(1, Ordering::Relaxed))
}
fn chrono_rfc3339_now() -> String {
let secs = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs() as i64)
.unwrap_or(0);
let days = secs.div_euclid(86_400);
let sod = secs.rem_euclid(86_400);
let hh = sod / 3600;
let mm = (sod % 3600) / 60;
let ss = sod % 60;
let z = days + 719_468;
let era = if z >= 0 { z } else { z - 146_096 } / 146_097;
let doe = z - era * 146_097;
let yoe = (doe - doe / 1460 + doe / 36_524 - doe / 146_096) / 365;
let y = yoe + era * 400;
let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
let mp = (5 * doy + 2) / 153;
let d = doy - (153 * mp + 2) / 5 + 1;
let m = if mp < 10 { mp + 3 } else { mp - 9 };
let y = if m <= 2 { y + 1 } else { y };
format!("{y:04}-{m:02}-{d:02}T{hh:02}:{mm:02}:{ss:02}Z")
}