use std::io::{Read, Write};
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::os::unix::net::UnixStream;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread::JoinHandle;
use std::time::Duration;
use crate::assets::AssetPaths;
use crate::vmm::pool::{PoolClientError, WarmPool, WarmPoolError};
use crate::vmm::resources::VmResources;
use crate::vmm::runner::RunOptions;
#[derive(Debug)]
#[non_exhaustive]
pub enum Error {
Image(String),
Vm(String),
Assets(String),
Io(std::io::Error),
Network(String),
CacheMiss(String),
CacheInvalid(String),
Bake(String),
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Error::Image(msg) => write!(f, "image: {msg}"),
Error::Vm(msg) => write!(f, "vm: {msg}"),
Error::Assets(msg) => write!(f, "assets: {msg}"),
Error::Io(e) => write!(f, "io: {e}"),
Error::Network(msg) => write!(f, "network: {msg}"),
Error::CacheMiss(msg) => write!(f, "cache miss: {msg}"),
Error::CacheInvalid(msg) => write!(f, "cache invalid: {msg}"),
Error::Bake(msg) => write!(f, "bake: {msg}"),
}
}
}
impl std::error::Error for Error {}
impl From<std::io::Error> for Error {
fn from(e: std::io::Error) -> Self {
Error::Io(e)
}
}
impl From<WarmPoolError> for Error {
fn from(e: WarmPoolError) -> Self {
Error::Vm(format!("{e}"))
}
}
impl From<PoolClientError> for Error {
fn from(e: PoolClientError) -> Self {
Error::Vm(format!("{e}"))
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum PullPolicy {
Always,
Missing,
Never,
}
impl Default for PullPolicy {
fn default() -> Self {
Self::Missing
}
}
impl PullPolicy {
fn as_bake_str(self) -> &'static str {
match self {
Self::Always => "always",
Self::Missing => "missing",
Self::Never => "never",
}
}
}
#[derive(Debug, Clone)]
pub struct Image {
snapshot_path: PathBuf,
pub(crate) memory_mib: u32,
pub(crate) vcpus: u32,
pub(crate) layers: Vec<PathBuf>,
pub(crate) delta_squashfs: Option<PathBuf>,
pub(crate) bundled_kernel: Option<PathBuf>,
}
impl Image {
pub fn from_snapshot(path: impl Into<PathBuf>) -> Result<Self, Error> {
let path = path.into();
let (snapshot_path, metadata_path) = if path.is_dir() {
(path.join("restore.snap"), path.join("metadata.json"))
} else if path.is_file() {
let parent = path.parent().ok_or_else(|| {
Error::Image(format!("snapshot path has no parent dir: {}", path.display()))
})?;
(path.clone(), parent.join("metadata.json"))
} else {
return Err(Error::Image(format!(
"snapshot path not found: {}",
path.display()
)));
};
if !snapshot_path.is_file() {
return Err(Error::Image(format!(
"snapshot file not found: {}",
snapshot_path.display()
)));
}
if !metadata_path.is_file() {
return Err(Error::Image(format!(
"metadata.json not found alongside snapshot at {}",
metadata_path.display()
)));
}
let meta_text = std::fs::read_to_string(&metadata_path)
.map_err(|e| Error::Image(format!("read {}: {e}", metadata_path.display())))?;
let meta: serde_json::Value = serde_json::from_str(&meta_text)
.map_err(|e| Error::Image(format!("parse {}: {e}", metadata_path.display())))?;
let memory_mib = meta
.get("memory_mib")
.and_then(|v| v.as_u64())
.map(|v| v as u32)
.unwrap_or(256);
let vcpus = meta
.get("vcpus")
.and_then(|v| v.as_u64())
.map(|v| v as u32)
.unwrap_or(1);
let metadata_dir = metadata_path
.parent()
.map(Path::to_path_buf)
.unwrap_or_else(|| PathBuf::from("."));
let resolve_path = |s: &str| -> PathBuf {
let p = PathBuf::from(s);
if p.is_absolute() {
p
} else {
metadata_dir.join(p)
}
};
let layers: Vec<PathBuf> = meta
.get("layers")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|x| x.as_str().map(&resolve_path))
.collect()
})
.unwrap_or_default();
let delta_squashfs = meta
.get("delta_squashfs")
.and_then(|v| v.as_str())
.map(&resolve_path);
let bundled_kernel = {
let cand = metadata_dir.join("kernel");
if cand.is_file() {
Some(cand)
} else {
None
}
};
Ok(Self {
snapshot_path,
memory_mib,
vcpus,
layers,
delta_squashfs,
bundled_kernel,
})
}
pub fn from_oci(image_ref: &str) -> Result<Self, Error> {
Self::from_oci_with_policy(image_ref, PullPolicy::default())
}
pub fn from_oci_with_policy(
image_ref: &str,
policy: PullPolicy,
) -> Result<Self, Error> {
let snapshots_dir = default_snapshots_dir();
Self::from_oci_to_dir(image_ref, policy, &snapshots_dir, None)
}
pub fn from_oci_to_dir(
image_ref: &str,
policy: PullPolicy,
snapshots_dir: &Path,
name: Option<&str>,
) -> Result<Self, Error> {
let derived = name
.map(|s| s.to_owned())
.unwrap_or_else(|| crate::bake::snapshot_name_for_image(image_ref));
let snap_dir = snapshots_dir.join(&derived);
let cache_loadable = Self::from_snapshot(&snap_dir).is_ok();
match policy {
PullPolicy::Never => {
if cache_loadable {
return Self::from_snapshot(&snap_dir);
}
let restore_snap = snap_dir.join("restore.snap");
if restore_snap.is_file() {
return Err(Error::CacheInvalid(format!(
"snapshot present at {} but not loadable on this binary; \
rebake required (PullPolicy::Never won't auto-rebake)",
snap_dir.display()
)));
}
return Err(Error::CacheMiss(format!(
"no cached snapshot for {image_ref} at {} (PullPolicy::Never)",
snap_dir.display()
)));
}
PullPolicy::Missing if cache_loadable => {
return Self::from_snapshot(&snap_dir);
}
_ => {}
}
let root = repo_root_for_bake()?;
let request = crate::bake::BakeRequest {
image: image_ref.to_owned(),
name: name.map(|s| s.to_owned()),
runtime: "supermachine".to_owned(),
guest_port: 80,
memory_mib: 256,
vcpus: 1,
pull_policy: policy.as_bake_str().to_owned(),
snapshots_dir: snapshots_dir.to_path_buf(),
cmd_override: None,
extra_args: Vec::new(),
};
let bake_t0 = std::time::Instant::now();
crate::bake::run_push(&request, bake_t0, &root).map_err(map_bake_error)?;
Self::from_snapshot(&snap_dir)
}
pub fn builder(image_ref: impl Into<String>) -> OciImageBuilder {
OciImageBuilder::new(image_ref)
}
pub fn snapshot_path(&self) -> &Path {
&self.snapshot_path
}
pub fn memory_mib(&self) -> u32 {
self.memory_mib
}
pub fn vcpus(&self) -> u32 {
self.vcpus
}
pub fn start(&self, config: &VmConfig) -> Result<Vm, Error> {
Vm::start(self, config)
}
pub fn acquire(&self) -> Result<PooledVm<'_>, Error> {
let vm = Vm::start(self, &VmConfig::new())?;
Ok(PooledVm { vm: Some(vm), _image: std::marker::PhantomData })
}
pub fn acquire_with(&self, config: &VmConfig) -> Result<PooledVm<'_>, Error> {
let vm = Vm::start(self, config)?;
Ok(PooledVm { vm: Some(vm), _image: std::marker::PhantomData })
}
}
pub struct PooledVm<'a> {
vm: Option<Vm>,
_image: std::marker::PhantomData<&'a Image>,
}
impl<'a> std::ops::Deref for PooledVm<'a> {
type Target = Vm;
fn deref(&self) -> &Vm {
self.vm.as_ref().expect("PooledVm used after drop")
}
}
impl<'a> std::ops::DerefMut for PooledVm<'a> {
fn deref_mut(&mut self) -> &mut Vm {
self.vm.as_mut().expect("PooledVm used after drop")
}
}
impl<'a> Drop for PooledVm<'a> {
fn drop(&mut self) {
if let Some(vm) = self.vm.take() {
let _ = vm.stop();
}
}
}
pub struct OciImageBuilder {
image: String,
name: Option<String>,
pull_policy: PullPolicy,
memory_mib: Option<u32>,
vcpus: Option<u32>,
guest_port: Option<u16>,
cmd: Option<Vec<String>>,
envs: Vec<(String, String)>,
snapshots_dir: Option<PathBuf>,
}
impl OciImageBuilder {
pub fn new(image_ref: impl Into<String>) -> Self {
Self {
image: image_ref.into(),
name: None,
pull_policy: PullPolicy::default(),
memory_mib: None,
vcpus: None,
guest_port: None,
cmd: None,
envs: Vec::new(),
snapshots_dir: None,
}
}
pub fn with_vcpus(mut self, vcpus: u32) -> Self {
self.vcpus = Some(vcpus);
self
}
pub fn with_name(mut self, name: impl Into<String>) -> Self {
self.name = Some(name.into());
self
}
pub fn with_pull_policy(mut self, policy: PullPolicy) -> Self {
self.pull_policy = policy;
self
}
pub fn with_memory_mib(mut self, mib: u32) -> Self {
self.memory_mib = Some(mib);
self
}
pub fn with_guest_port(mut self, port: u16) -> Self {
self.guest_port = Some(port);
self
}
pub fn with_cmd<I, S>(mut self, cmd: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
self.cmd = Some(cmd.into_iter().map(Into::into).collect());
self
}
pub fn with_env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.envs.push((key.into(), value.into()));
self
}
pub fn with_snapshots_dir(mut self, dir: impl Into<PathBuf>) -> Self {
self.snapshots_dir = Some(dir.into());
self
}
pub fn build(self) -> Result<Image, Error> {
let snapshots_dir = self
.snapshots_dir
.unwrap_or_else(default_snapshots_dir);
let derived_name = self
.name
.clone()
.unwrap_or_else(|| crate::bake::snapshot_name_for_image(&self.image));
let snap_dir = snapshots_dir.join(&derived_name);
let cache_loadable = Image::from_snapshot(&snap_dir).is_ok();
match self.pull_policy {
PullPolicy::Never => {
if cache_loadable {
return Image::from_snapshot(&snap_dir);
}
let restore_snap = snap_dir.join("restore.snap");
if restore_snap.is_file() {
return Err(Error::CacheInvalid(format!(
"snapshot present at {} but not loadable on this binary; \
rebake required (PullPolicy::Never won't auto-rebake)",
snap_dir.display()
)));
}
return Err(Error::CacheMiss(format!(
"no cached snapshot for {} at {} (PullPolicy::Never)",
self.image,
snap_dir.display()
)));
}
_ => {}
}
let mut extra_args: Vec<String> = Vec::new();
for (k, v) in &self.envs {
extra_args.push("--env".to_owned());
extra_args.push(format!("{k}={v}"));
}
let cmd_override = match &self.cmd {
Some(argv) => Some(
serde_json::to_string(argv)
.map_err(|e| Error::Bake(format!("encode cmd: {e}")))?,
),
None => None,
};
let root = repo_root_for_bake()?;
let request = crate::bake::BakeRequest {
image: self.image.clone(),
name: self.name.clone(),
runtime: "supermachine".to_owned(),
guest_port: self.guest_port.unwrap_or(80),
memory_mib: self.memory_mib.unwrap_or(256),
vcpus: self.vcpus.unwrap_or(1),
pull_policy: self.pull_policy.as_bake_str().to_owned(),
snapshots_dir: snapshots_dir.clone(),
cmd_override,
extra_args,
};
let bake_t0 = std::time::Instant::now();
crate::bake::run_push(&request, bake_t0, &root).map_err(map_bake_error)?;
Image::from_snapshot(&snap_dir)
}
}
fn default_snapshots_dir() -> PathBuf {
if let Some(d) = std::env::var_os("SUPERMACHINE_SNAPSHOTS") {
return PathBuf::from(d);
}
let home = std::env::var_os("HOME")
.map(PathBuf::from)
.unwrap_or_else(|| PathBuf::from("."));
home.join(".local/supermachine-snapshots")
}
fn repo_root_for_bake() -> Result<PathBuf, Error> {
if let Some(root) = std::env::var_os("SUPERMACHINE_ROOT") {
return Ok(PathBuf::from(root));
}
let exe = std::env::current_exe()
.map_err(|e| Error::Bake(format!("current_exe: {e}")))?;
for ancestor in exe.ancestors() {
if ancestor.join("tools/supermachine-push").is_file() {
return Ok(ancestor.to_path_buf());
}
if ancestor.join("share/supermachine/kernel").is_file() {
return Ok(ancestor.to_path_buf());
}
}
std::env::current_dir().map_err(|e| Error::Bake(format!("current_dir: {e}")))
}
fn map_bake_error(msg: String) -> Error {
let lc = msg.to_ascii_lowercase();
if lc.contains("registry") || lc.contains("manifest") || lc.contains("docker pull")
|| lc.contains("auth")
{
Error::Network(msg)
} else if lc.contains("snapshot") && lc.contains("timeout") {
Error::Bake(msg)
} else if lc.contains("listener readiness") {
Error::Bake(msg)
} else {
Error::Bake(msg)
}
}
#[derive(Debug, Clone, Default)]
pub struct VmConfig {
memory_mib: Option<u32>,
vcpus: Option<u32>,
assets: Option<AssetPaths>,
vsock_mux_dir: Option<PathBuf>,
restore_timeout: Option<Duration>,
}
impl VmConfig {
pub fn new() -> Self {
Self::default()
}
pub fn with_memory_mib(mut self, mib: u32) -> Self {
self.memory_mib = Some(mib);
self
}
pub fn with_vcpus(mut self, vcpus: u32) -> Self {
self.vcpus = Some(vcpus);
self
}
pub fn with_assets(mut self, assets: AssetPaths) -> Self {
self.assets = Some(assets);
self
}
pub fn with_vsock_mux_dir(mut self, dir: impl Into<PathBuf>) -> Self {
self.vsock_mux_dir = Some(dir.into());
self
}
pub fn with_restore_timeout(mut self, timeout: Duration) -> Self {
self.restore_timeout = Some(timeout);
self
}
}
pub struct Vm {
pool: Option<WarmPool>,
vsock_mux_path: PathBuf,
vsock_exec_path: PathBuf,
own_vsock_mux_dir: Option<PathBuf>,
}
impl Vm {
pub fn start(image: &Image, config: &VmConfig) -> Result<Vm, Error> {
let assets = match &config.assets {
Some(a) => a.clone(),
None => AssetPaths::discover(),
};
let kernel: PathBuf = if let Some(k) = image.bundled_kernel.as_ref() {
k.clone()
} else if let Some(k) = assets.kernel.as_ref() {
k.clone()
} else {
return Err(Error::Assets(
"no kernel found: snapshot dir has no bundled kernel and AssetPaths::discover() came up empty; set VmConfig::with_assets() or $SUPERMACHINE_ASSETS_DIR".to_owned(),
));
};
let kernel = kernel.as_path();
let dir = match &config.vsock_mux_dir {
Some(d) => d.clone(),
None => std::env::temp_dir(),
};
let mut own_dir = None;
if !dir.is_dir() {
std::fs::create_dir_all(&dir).map_err(Error::Io)?;
own_dir = Some(dir.clone());
}
let vsock_mux_path = dir.join(format!(
"supermachine-vm-{}-{}.sock",
std::process::id(),
unique_suffix(),
));
let vsock_exec_path = {
let mut p = vsock_mux_path.clone();
let mut name = p.file_name().unwrap().to_owned();
name.push("-exec");
p.set_file_name(name);
p
};
let memory_mib = config.memory_mib.unwrap_or(image.memory_mib);
let vcpus = config.vcpus.unwrap_or(image.vcpus);
let mut resources = VmResources::new()
.with_kernel_path(kernel.to_string_lossy().to_string())
.with_memory_mib(memory_mib as usize)
.with_vcpus(vcpus)
.with_cow_restore(true)
.with_restore(image.snapshot_path.to_string_lossy().to_string())
.with_vsock_mux(vsock_mux_path.to_string_lossy().to_string())
.with_vsock_exec(vsock_exec_path.to_string_lossy().to_string());
for layer in &image.layers {
resources = resources.with_block_device(layer.to_string_lossy().to_string());
}
if let Some(delta) = &image.delta_squashfs {
resources = resources.with_block_device(delta.to_string_lossy().to_string());
}
let options = RunOptions::default();
let pool = WarmPool::start(resources, options).map_err(Error::from)?;
let timeout = config
.restore_timeout
.unwrap_or_else(|| Duration::from_secs(10));
let _ = pool
.restore_timeout(image.snapshot_path.to_string_lossy().to_string(), timeout)
.map_err(Error::from)?;
Ok(Vm {
pool: Some(pool),
vsock_mux_path,
vsock_exec_path,
own_vsock_mux_dir: own_dir,
})
}
pub fn vsock_path(&self) -> &Path {
&self.vsock_mux_path
}
pub fn exec_path(&self) -> &Path {
&self.vsock_exec_path
}
pub fn exec<I, S>(&self, argv: I) -> std::io::Result<crate::exec::ExecChild>
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
self.exec_builder().argv(argv).spawn()
}
pub fn exec_builder(&self) -> crate::exec::ExecBuilder {
crate::exec::ExecBuilder::new(self.vsock_exec_path.clone())
}
pub fn write_file(&self, path: &str, bytes: &[u8]) -> std::io::Result<()> {
use std::io::Write;
let escaped = path.replace('\'', "'\\''");
let cmd = format!("cat > '{escaped}'");
let mut child = self
.exec_builder()
.argv(["sh", "-c", &cmd])
.spawn()?;
if let Some(mut stdin) = child.stdin() {
stdin.write_all(bytes)?;
stdin.close()?;
}
let status = child.wait()?;
if !status.success() {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
format!("write_file({path}): cat exited with {:?}", status.code()),
));
}
Ok(())
}
pub fn read_file(&self, path: &str) -> std::io::Result<Vec<u8>> {
let outcome = self
.exec_builder()
.argv(["cat", path])
.output()?;
if !outcome.success() {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
format!(
"read_file({path}): cat exited with {:?}, stderr: {}",
outcome.status.code(),
String::from_utf8_lossy(&outcome.stderr)
),
));
}
Ok(outcome.stdout)
}
pub fn workload_signal(&self, signum: i32) -> std::io::Result<()> {
let body = serde_json::json!({
"action": "signal",
"signum": signum,
});
crate::exec::send_control(&self.vsock_exec_path, &body)
}
pub fn connect(&self) -> std::io::Result<UnixStream> {
UnixStream::connect(&self.vsock_mux_path)
}
pub fn expose_tcp(&self, host_port: u16, _guest_port: u16) -> std::io::Result<TcpForwarder> {
let listener = TcpListener::bind(("127.0.0.1", host_port))?;
let bound = listener.local_addr()?;
listener.set_nonblocking(false)?;
let stop = Arc::new(AtomicBool::new(false));
let stop_thread = stop.clone();
let vsock_path = self.vsock_mux_path.clone();
let handle = std::thread::Builder::new()
.name(format!("supermachine-tcp-{host_port}"))
.spawn(move || {
accept_loop(listener, vsock_path, stop_thread);
})
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
Ok(TcpForwarder {
stop,
handle: Some(handle),
bound,
})
}
pub fn stop(mut self) -> Result<(), Error> {
if let Some(pool) = self.pool.take() {
let _ = pool.shutdown().map_err(Error::from)?;
}
self.cleanup_socket();
Ok(())
}
fn cleanup_socket(&self) {
let _ = std::fs::remove_file(&self.vsock_mux_path);
let _ = std::fs::remove_file(&self.vsock_exec_path);
if let Some(dir) = &self.own_vsock_mux_dir {
let _ = std::fs::remove_dir(dir);
}
}
}
impl Drop for Vm {
fn drop(&mut self) {
if let Some(pool) = self.pool.take() {
let _ = pool.shutdown();
}
self.cleanup_socket();
}
}
pub struct TcpForwarder {
stop: Arc<AtomicBool>,
handle: Option<JoinHandle<()>>,
bound: SocketAddr,
}
impl TcpForwarder {
pub fn local_addr(&self) -> SocketAddr {
self.bound
}
pub fn stop(mut self) {
self.shutdown();
}
fn shutdown(&mut self) {
self.stop.store(true, Ordering::SeqCst);
let _ = TcpStream::connect_timeout(&self.bound, Duration::from_millis(200));
if let Some(h) = self.handle.take() {
let _ = h.join();
}
}
}
impl Drop for TcpForwarder {
fn drop(&mut self) {
self.shutdown();
}
}
fn accept_loop(listener: TcpListener, vsock_path: PathBuf, stop: Arc<AtomicBool>) {
for incoming in listener.incoming() {
if stop.load(Ordering::SeqCst) {
break;
}
let tcp = match incoming {
Ok(s) => s,
Err(_) => continue,
};
let vsock = vsock_path.clone();
std::thread::Builder::new()
.name("supermachine-tcp-conn".into())
.spawn(move || {
if let Err(e) = splice_tcp_to_unix(tcp, &vsock) {
eprintln!("supermachine: tcp forward: {e}");
}
})
.ok();
}
}
fn splice_tcp_to_unix(tcp: TcpStream, vsock_path: &Path) -> std::io::Result<()> {
let unix = UnixStream::connect(vsock_path)?;
let tcp_w = tcp.try_clone()?;
let unix_w = unix.try_clone()?;
let t1 = std::thread::Builder::new()
.name("supermachine-tcp-c2g".into())
.spawn(move || {
let _ = pump(tcp, unix_w);
})?;
let t2 = std::thread::Builder::new()
.name("supermachine-tcp-g2c".into())
.spawn(move || {
let _ = pump(unix, tcp_w);
})?;
let _ = t1.join();
let _ = t2.join();
Ok(())
}
fn pump<R, W>(mut r: R, mut w: W) -> std::io::Result<()>
where
R: Read,
W: Write + Shutdownable,
{
let mut buf = [0u8; 16 * 1024];
loop {
let n = match r.read(&mut buf) {
Ok(0) => break,
Ok(n) => n,
Err(e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
Err(e) => return Err(e),
};
if let Err(e) = w.write_all(&buf[..n]) {
return Err(e);
}
}
let _ = w.shutdown_write();
Ok(())
}
trait Shutdownable {
fn shutdown_write(&mut self) -> std::io::Result<()>;
}
impl Shutdownable for TcpStream {
fn shutdown_write(&mut self) -> std::io::Result<()> {
TcpStream::shutdown(self, std::net::Shutdown::Write)
}
}
impl Shutdownable for UnixStream {
fn shutdown_write(&mut self) -> std::io::Result<()> {
UnixStream::shutdown(self, std::net::Shutdown::Write)
}
}
fn unique_suffix() -> u64 {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0);
nanos.wrapping_add(COUNTER.fetch_add(1, Ordering::Relaxed))
}