use std::cell::RefCell;
use std::os::unix::fs::PermissionsExt;
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::OnceLock;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
mod layer;
mod oci;
mod registry;
pub(crate) mod squashfs;
use layer::*;
use oci::*;
use registry::*;
pub struct BakeRequest {
pub image: String,
pub name: Option<String>,
pub runtime: String,
pub guest_port: u16,
pub memory_mib: u32,
pub vcpus: u32,
pub pull_policy: String,
pub snapshots_dir: PathBuf,
pub cmd_override: Option<String>,
pub extra_args: Vec<String>,
pub platform: String,
}
pub struct PipelinedWarmup<'a> {
pub warm_dir: PathBuf,
pub warm_tag: String,
pub callback: Box<dyn FnOnce(&PipelinedWarmupContext) -> Result<(), String> + Send + 'a>,
pub keep_alive: bool,
pub skip_warm_snapshot: bool,
pub use_pre_exec_trigger: bool,
}
pub struct BakedWorker {
pub child: std::process::Child,
pub vsock_mux_path: PathBuf,
pub vsock_exec_path: PathBuf,
pub control_path: PathBuf,
pub control_writer: std::os::unix::net::UnixStream,
pub control_reader: std::os::unix::net::UnixStream,
pub socks_dir: PathBuf,
pub last_restore_path: PathBuf,
}
impl std::fmt::Debug for BakedWorker {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BakedWorker")
.field("child_pid", &self.child.id())
.field("vsock_mux_path", &self.vsock_mux_path)
.field("vsock_exec_path", &self.vsock_exec_path)
.field("control_path", &self.control_path)
.field("socks_dir", &self.socks_dir)
.field("last_restore_path", &self.last_restore_path)
.finish()
}
}
impl BakedWorker {
pub fn shutdown(mut self) {
use std::io::Write;
let _ = writeln!(&mut self.control_writer, "QUIT");
let _ = self.control_writer.flush();
let kill_deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
loop {
match self.child.try_wait() {
Ok(Some(_)) => break,
Ok(None) if std::time::Instant::now() < kill_deadline => {
std::thread::sleep(std::time::Duration::from_millis(20));
}
_ => {
let _ = self.child.kill();
let _ = self.child.wait();
break;
}
}
}
let _ = std::fs::remove_dir_all(&self.socks_dir);
}
}
#[derive(Debug)]
pub struct WarmStash {
pub inner: std::sync::Mutex<Option<BakedWorker>>,
}
impl WarmStash {
pub fn new(bw: Option<BakedWorker>) -> Self {
Self {
inner: std::sync::Mutex::new(bw),
}
}
pub fn take(&self) -> Option<BakedWorker> {
self.inner.lock().ok().and_then(|mut g| g.take())
}
}
impl Drop for WarmStash {
fn drop(&mut self) {
if let Ok(mut g) = self.inner.lock() {
if let Some(bw) = g.take() {
bw.shutdown();
}
}
}
}
pub struct PipelinedWarmupContext {
pub vsock_mux_path: PathBuf,
pub vsock_exec_path: PathBuf,
}
trait ImageSource {
fn local_arch(&self, image: &str) -> Option<String>;
fn pull(&self, image: &str, force_refresh: bool) -> Result<(), String>;
fn inspect(&self, image: &str) -> Result<serde_json::Value, String>;
fn save(&self, image: &str, work_dir: &Path) -> Result<PathBuf, String>;
fn arch(&self) -> &str;
}
struct DockerImageSource {
arch: String,
}
#[derive(Default)]
struct DockerInspectCacheEntry {
local_arch: Option<(Option<String>, Instant)>,
inspect: Option<(serde_json::Value, Instant)>,
}
fn docker_inspect_cache(
) -> &'static std::sync::Mutex<std::collections::HashMap<(String, String), DockerInspectCacheEntry>>
{
static CACHE: OnceLock<
std::sync::Mutex<std::collections::HashMap<(String, String), DockerInspectCacheEntry>>,
> = OnceLock::new();
CACHE.get_or_init(|| std::sync::Mutex::new(std::collections::HashMap::new()))
}
fn docker_inspect_cache_ttl() -> Duration {
let secs = std::env::var("SUPERMACHINE_DOCKER_INSPECT_CACHE_SECS")
.ok()
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(60);
Duration::from_secs(secs)
}
impl ImageSource for DockerImageSource {
fn arch(&self) -> &str {
&self.arch
}
fn local_arch(&self, image: &str) -> Option<String> {
let key = (image.to_owned(), self.arch.clone());
let ttl = docker_inspect_cache_ttl();
if !ttl.is_zero() {
if let Ok(g) = docker_inspect_cache().lock() {
if let Some(entry) = g.get(&key) {
if let Some((val, ts)) = &entry.local_arch {
if ts.elapsed() < ttl {
return val.clone();
}
}
}
}
}
let mut cmd = Command::new("docker");
cmd.arg("image")
.arg("inspect")
.arg("--format")
.arg("{{.Architecture}}")
.arg(image)
.stderr(Stdio::null());
let result = command_output(cmd, "docker image inspect architecture")
.ok()
.map(|s| s.trim().to_owned())
.filter(|s| !s.is_empty() && s != "<no value>");
if !ttl.is_zero() {
if let Ok(mut g) = docker_inspect_cache().lock() {
g.entry(key).or_default().local_arch = Some((result.clone(), Instant::now()));
}
}
result
}
fn pull(&self, image: &str, _force_refresh: bool) -> Result<(), String> {
let mut cmd = Command::new("docker");
cmd.arg("pull")
.arg(format!("--platform=linux/{}", self.arch))
.arg(image)
.stdout(Stdio::null());
let r = run_status(cmd, "docker pull");
if let Ok(mut g) = docker_inspect_cache().lock() {
g.remove(&(image.to_owned(), self.arch.clone()));
}
r
}
fn inspect(&self, image: &str) -> Result<serde_json::Value, String> {
let key = (image.to_owned(), self.arch.clone());
let ttl = docker_inspect_cache_ttl();
if !ttl.is_zero() {
if let Ok(g) = docker_inspect_cache().lock() {
if let Some(entry) = g.get(&key) {
if let Some((val, ts)) = &entry.inspect {
if ts.elapsed() < ttl {
return Ok(val.clone());
}
}
}
}
}
let mut inspect_cmd = Command::new("docker");
inspect_cmd.arg("image").arg("inspect").arg(image);
let inspect = command_output(inspect_cmd, "docker image inspect")?;
let inspect_json: serde_json::Value = serde_json::from_str(&inspect)
.map_err(|e| format!("docker image inspect JSON: {e}"))?;
let record = inspect_json
.as_array()
.and_then(|a| a.first())
.cloned()
.ok_or_else(|| format!("docker image inspect returned no records for {image}"))?;
if !ttl.is_zero() {
if let Ok(mut g) = docker_inspect_cache().lock() {
g.entry(key).or_default().inspect = Some((record.clone(), Instant::now()));
}
}
Ok(record)
}
fn save(&self, image: &str, work_dir: &Path) -> Result<PathBuf, String> {
let save_tar = work_dir.join("image.tar");
let mut save = Command::new("docker");
save.arg("save")
.arg(format!("--platform=linux/{}", self.arch))
.arg(image)
.arg("-o")
.arg(&save_tar)
.stderr(Stdio::null());
run_status(save, "docker save")?;
let save_dir = work_dir.join("_save");
std::fs::create_dir_all(&save_dir)
.map_err(|e| format!("create save dir {}: {e}", save_dir.display()))?;
oci::extract_tar_archive(&save_tar, &save_dir)?;
let _ = std::fs::remove_file(&save_tar);
Ok(save_dir)
}
}
struct ContainerImageSource {
arch: String,
}
fn container_inspect_cache(
) -> &'static std::sync::Mutex<std::collections::HashMap<(String, String), DockerInspectCacheEntry>>
{
static CACHE: OnceLock<
std::sync::Mutex<std::collections::HashMap<(String, String), DockerInspectCacheEntry>>,
> = OnceLock::new();
CACHE.get_or_init(|| std::sync::Mutex::new(std::collections::HashMap::new()))
}
impl ImageSource for ContainerImageSource {
fn arch(&self) -> &str {
&self.arch
}
fn local_arch(&self, image: &str) -> Option<String> {
let key = (image.to_owned(), self.arch.clone());
let ttl = docker_inspect_cache_ttl();
if !ttl.is_zero() {
if let Ok(g) = container_inspect_cache().lock() {
if let Some(entry) = g.get(&key) {
if let Some((val, ts)) = &entry.local_arch {
if ts.elapsed() < ttl {
return val.clone();
}
}
}
}
}
let mut cmd = Command::new("container");
cmd.arg("image")
.arg("inspect")
.arg("--format")
.arg("{{.Architecture}}")
.arg(image)
.stderr(Stdio::null());
let result = command_output(cmd, "container image inspect architecture")
.ok()
.map(|s| s.trim().to_owned())
.filter(|s| !s.is_empty() && s != "<no value>");
if !ttl.is_zero() {
if let Ok(mut g) = container_inspect_cache().lock() {
g.entry(key).or_default().local_arch = Some((result.clone(), Instant::now()));
}
}
result
}
fn pull(&self, image: &str, _force_refresh: bool) -> Result<(), String> {
let mut cmd = Command::new("container");
cmd.arg("image")
.arg("pull")
.arg(format!("--platform=linux/{}", self.arch))
.arg(image)
.stdout(Stdio::null());
let r = run_status(cmd, "container image pull");
if let Ok(mut g) = container_inspect_cache().lock() {
g.remove(&(image.to_owned(), self.arch.clone()));
}
r
}
fn inspect(&self, image: &str) -> Result<serde_json::Value, String> {
let key = (image.to_owned(), self.arch.clone());
let ttl = docker_inspect_cache_ttl();
if !ttl.is_zero() {
if let Ok(g) = container_inspect_cache().lock() {
if let Some(entry) = g.get(&key) {
if let Some((val, ts)) = &entry.inspect {
if ts.elapsed() < ttl {
return Ok(val.clone());
}
}
}
}
}
let mut inspect_cmd = Command::new("container");
inspect_cmd.arg("image").arg("inspect").arg(image);
let inspect = command_output(inspect_cmd, "container image inspect")?;
let inspect_json: serde_json::Value = serde_json::from_str(&inspect)
.map_err(|e| format!("container image inspect JSON: {e}"))?;
let record = inspect_json
.as_array()
.and_then(|a| a.first())
.cloned()
.ok_or_else(|| format!("container image inspect returned no records for {image}"))?;
if !ttl.is_zero() {
if let Ok(mut g) = container_inspect_cache().lock() {
g.entry(key).or_default().inspect = Some((record.clone(), Instant::now()));
}
}
Ok(record)
}
fn save(&self, image: &str, work_dir: &Path) -> Result<PathBuf, String> {
let save_tar = work_dir.join("image.tar");
let mut save = Command::new("container");
save.arg("image")
.arg("save")
.arg(format!("--platform=linux/{}", self.arch))
.arg("-o")
.arg(&save_tar)
.arg(image)
.stderr(Stdio::null());
run_status(save, "container image save")?;
let save_dir = work_dir.join("_save");
std::fs::create_dir_all(&save_dir)
.map_err(|e| format!("create save dir {}: {e}", save_dir.display()))?;
oci::extract_tar_archive(&save_tar, &save_dir)?;
let _ = std::fs::remove_file(&save_tar);
Ok(save_dir)
}
}
#[derive(Clone)]
struct RegistryImageRef {
registry: String,
repository: String,
reference: String,
}
#[derive(Clone)]
struct RegistryCreds {
user: String,
pass: String,
}
impl RegistryCreds {
fn basic_header_value(&self) -> String {
let mut joined = String::with_capacity(self.user.len() + self.pass.len() + 1);
joined.push_str(&self.user);
joined.push(':');
joined.push_str(&self.pass);
format!("Basic {}", b64_encode(joined.as_bytes()))
}
}
fn docker_config_auth(registry: &str) -> Option<RegistryCreds> {
let config_path = std::env::var_os("DOCKER_CONFIG")
.map(|d| PathBuf::from(d).join("config.json"))
.unwrap_or_else(|| home_join(".docker/config.json"));
let text = std::fs::read_to_string(&config_path).ok()?;
let json: serde_json::Value = serde_json::from_str(&text).ok()?;
let auths = json.get("auths")?.as_object()?;
let docker_hub_aliases = ["docker.io", "index.docker.io", "registry-1.docker.io"];
let is_docker_hub = docker_hub_aliases.contains(®istry);
let mut keys: Vec<String> = vec![
registry.to_string(),
format!("https://{registry}"),
format!("https://{registry}/"),
format!("https://{registry}/v1/"),
format!("https://{registry}/v2/"),
];
if is_docker_hub {
for alias in &docker_hub_aliases {
keys.push(format!("https://{alias}/v1/"));
keys.push(format!("https://{alias}/v2/"));
keys.push(format!("https://{alias}"));
keys.push((*alias).to_string());
}
}
for key in &keys {
let entry = match auths.get(key) {
Some(e) => e,
None => continue,
};
let auth_b64 = entry.get("auth").and_then(|v| v.as_str())?;
let decoded = b64_decode(auth_b64.trim())?;
let s = String::from_utf8(decoded).ok()?;
let (user, pass) = s.split_once(':')?;
return Some(RegistryCreds {
user: user.to_owned(),
pass: pass.to_owned(),
});
}
None
}
fn b64_decode(s: &str) -> Option<Vec<u8>> {
let mut lookup = [255u8; 256];
let table = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
for (i, &b) in table.iter().enumerate() {
lookup[b as usize] = i as u8;
}
let mut out = Vec::new();
let mut buf: u32 = 0;
let mut bits: u32 = 0;
for &b in s.as_bytes() {
if b == b'=' {
break;
}
if matches!(b, b'\n' | b'\r' | b' ' | b'\t') {
continue;
}
let v = lookup[b as usize];
if v == 255 {
return None;
}
buf = (buf << 6) | u32::from(v);
bits += 6;
if bits >= 8 {
bits -= 8;
out.push((buf >> bits) as u8);
buf &= (1u32 << bits).wrapping_sub(1);
}
}
Some(out)
}
fn b64_encode(bytes: &[u8]) -> String {
let table = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
let mut out = String::with_capacity(((bytes.len() + 2) / 3) * 4);
let mut i = 0;
while i + 3 <= bytes.len() {
let n = ((bytes[i] as u32) << 16) | ((bytes[i + 1] as u32) << 8) | (bytes[i + 2] as u32);
out.push(table[((n >> 18) & 0x3f) as usize] as char);
out.push(table[((n >> 12) & 0x3f) as usize] as char);
out.push(table[((n >> 6) & 0x3f) as usize] as char);
out.push(table[(n & 0x3f) as usize] as char);
i += 3;
}
let rem = bytes.len() - i;
if rem == 1 {
let n = (bytes[i] as u32) << 16;
out.push(table[((n >> 18) & 0x3f) as usize] as char);
out.push(table[((n >> 12) & 0x3f) as usize] as char);
out.push('=');
out.push('=');
} else if rem == 2 {
let n = ((bytes[i] as u32) << 16) | ((bytes[i + 1] as u32) << 8);
out.push(table[((n >> 18) & 0x3f) as usize] as char);
out.push(table[((n >> 12) & 0x3f) as usize] as char);
out.push(table[((n >> 6) & 0x3f) as usize] as char);
out.push('=');
}
out
}
struct RegistryImageSource {
image: RegistryImageRef,
cached_layout: RefCell<Option<RegistryLayoutCache>>,
arch: String,
}
struct RegistryLayoutCache {
work_dir: PathBuf,
layout: PathBuf,
}
impl RegistryImageSource {
fn cached_layout(&self, force_refresh: bool) -> Result<PathBuf, String> {
if let Some(cache) = self.cached_layout.borrow().as_ref() {
return Ok(cache.layout.clone());
}
let work_dir = temp_work_dir("supermachine-registry-layout")?;
let layout = work_dir.join("_oci");
let result = fetch_registry_to_oci_layout(&self.image, &self.arch, &layout, force_refresh);
if let Err(err) = result {
let _ = std::fs::remove_dir_all(work_dir);
return Err(err);
}
*self.cached_layout.borrow_mut() = Some(RegistryLayoutCache {
work_dir,
layout: layout.clone(),
});
Ok(layout)
}
}
impl Drop for RegistryImageSource {
fn drop(&mut self) {
if let Some(cache) = self.cached_layout.get_mut().take() {
let _ = std::fs::remove_dir_all(cache.work_dir);
}
}
}
impl ImageSource for RegistryImageSource {
fn arch(&self) -> &str {
&self.arch
}
fn local_arch(&self, _image: &str) -> Option<String> {
None
}
fn pull(&self, _image: &str, force_refresh: bool) -> Result<(), String> {
self.cached_layout(force_refresh).map(|_| ())
}
fn inspect(&self, image: &str) -> Result<serde_json::Value, String> {
let layout = self.cached_layout(false)?;
inspect_oci_layout(image, &layout, &self.arch)
}
fn save(&self, _image: &str, work_dir: &Path) -> Result<PathBuf, String> {
if let Some(cache) = self.cached_layout.borrow().as_ref() {
return Ok(cache.layout.clone());
}
let layout = work_dir.join("_oci");
fetch_registry_to_oci_layout(&self.image, &self.arch, &layout, false)?;
Ok(layout)
}
}
struct HttpResponse {
status: u16,
headers: String,
body: Vec<u8>,
}
struct RegistryManifest {
manifest_digest: String,
manifest_bytes: Vec<u8>,
manifest: serde_json::Value,
config_digest: String,
config_bytes: Vec<u8>,
blob_cache_hits: usize,
blob_downloads: usize,
ref_cache_hit: bool,
ref_cache_age_ms: Option<u128>,
ref_cache_ttl_ms: u128,
}
struct OciLayoutImageSource {
path: PathBuf,
arch: String,
}
impl ImageSource for OciLayoutImageSource {
fn arch(&self) -> &str {
&self.arch
}
fn local_arch(&self, _image: &str) -> Option<String> {
None
}
fn pull(&self, _image: &str, _force_refresh: bool) -> Result<(), String> {
Ok(())
}
fn inspect(&self, image: &str) -> Result<serde_json::Value, String> {
inspect_oci_layout(image, &self.path, &self.arch)
}
fn save(&self, _image: &str, _work_dir: &Path) -> Result<PathBuf, String> {
Ok(self.path.clone())
}
}
struct OciArchiveImageSource {
path: PathBuf,
arch: String,
}
impl ImageSource for OciArchiveImageSource {
fn arch(&self) -> &str {
&self.arch
}
fn local_arch(&self, _image: &str) -> Option<String> {
None
}
fn pull(&self, _image: &str, _force_refresh: bool) -> Result<(), String> {
Ok(())
}
fn inspect(&self, image: &str) -> Result<serde_json::Value, String> {
let work_dir = temp_work_dir("supermachine-oci-archive-inspect")?;
let arch = self.arch.clone();
let result = (|| {
let layout = extract_oci_archive(&self.path, &work_dir)?;
inspect_oci_layout(image, &layout, &arch)
})();
let _ = std::fs::remove_dir_all(work_dir);
result
}
fn save(&self, _image: &str, work_dir: &Path) -> Result<PathBuf, String> {
extract_oci_archive(&self.path, work_dir)
}
}
pub(crate) fn supermachine_worker_bin(root: &Path) -> PathBuf {
#[cfg(target_os = "macos")]
{
if let Some(p) = std::env::var_os("SUPERMACHINE_WORKER_BIN") {
let p = PathBuf::from(p);
if p.is_file() {
let _ = crate::codesign::ensure_worker_signed(&p);
return p;
}
}
let source = std::env::var_os("SUPERMACHINE_WORKER_BIN_BUNDLED")
.map(PathBuf::from)
.filter(|p| p.is_file())
.or_else(crate::codesign::locate_worker_bin);
if let Some(p) = source {
if let Ok(user_dir_copy) = crate::assets::ensure_worker_in_user_dir(&p) {
let _ = crate::codesign::ensure_worker_signed(&user_dir_copy);
return user_dir_copy;
}
let _ = crate::codesign::ensure_worker_signed(&p);
return p;
}
}
#[cfg(not(target_os = "macos"))]
if let Some(p) = crate::codesign::locate_worker_bin() {
return p;
}
for candidate in [
"target/release/supermachine-worker",
"bin/supermachine-worker",
] {
let p = root.join(candidate);
if p.is_file() {
return p;
}
}
root.join("target/release/supermachine-worker")
}
pub(crate) fn supermachine_kernel(root: &Path) -> PathBuf {
for candidate in [
"crates/supermachine-kernel/kernel",
"share/supermachine/kernel",
] {
let p = root.join(candidate);
if p.is_file() {
return p;
}
}
if let Some(p) = crate::assets::AssetPaths::discover().kernel {
return p;
}
root.join("crates/supermachine-kernel/kernel")
}
fn select_image_source(image: &str, arch: &str) -> Result<Box<dyn ImageSource>, String> {
if let Some(path) = image.strip_prefix("oci-layout:") {
let path = PathBuf::from(path);
if !path.join("index.json").is_file() {
return Err(format!("OCI layout missing index.json: {}", path.display()));
}
return Ok(Box::new(OciLayoutImageSource {
path,
arch: arch.to_owned(),
}));
}
if let Some(path) = image.strip_prefix("oci-archive:") {
let path = PathBuf::from(path);
if !path.is_file() {
return Err(format!("OCI archive not found: {}", path.display()));
}
return Ok(Box::new(OciArchiveImageSource {
path,
arch: arch.to_owned(),
}));
}
let explicit = std::env::var("SUPERMACHINE_IMAGE_SOURCE")
.ok()
.map(|s| s.to_lowercase());
match explicit.as_deref() {
Some("docker") => Ok(Box::new(DockerImageSource {
arch: arch.to_owned(),
})),
Some("container") => Ok(Box::new(ContainerImageSource {
arch: arch.to_owned(),
})),
Some("registry") | None => Ok(Box::new(RegistryImageSource {
image: parse_registry_image_ref(image)?,
cached_layout: RefCell::new(None),
arch: arch.to_owned(),
})),
Some("auto") => {
if let Some(rt) = detect_local_image_runtime() {
return match rt {
LocalImageRuntime::Docker => Ok(Box::new(DockerImageSource {
arch: arch.to_owned(),
})),
LocalImageRuntime::Container => Ok(Box::new(ContainerImageSource {
arch: arch.to_owned(),
})),
};
}
Ok(Box::new(RegistryImageSource {
image: parse_registry_image_ref(image)?,
cached_layout: RefCell::new(None),
arch: arch.to_owned(),
}))
}
Some(other) => Err(format!(
"SUPERMACHINE_IMAGE_SOURCE={other:?} unrecognized; expected one of \
docker | container | registry | auto"
)),
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
enum LocalImageRuntime {
Docker,
Container,
}
fn detect_local_image_runtime() -> Option<LocalImageRuntime> {
static DETECTED: OnceLock<Option<LocalImageRuntime>> = OnceLock::new();
*DETECTED.get_or_init(|| {
if probe_runtime_reachable("docker", &["info"]) {
Some(LocalImageRuntime::Docker)
} else if probe_runtime_reachable("container", &["system", "info"]) {
Some(LocalImageRuntime::Container)
} else {
None
}
})
}
fn probe_runtime_reachable(bin: &str, args: &[&str]) -> bool {
let mut cmd = Command::new(bin);
cmd.args(args).stdout(Stdio::null()).stderr(Stdio::null());
match cmd.spawn() {
Ok(mut child) => {
let deadline = Instant::now() + Duration::from_secs(5);
loop {
match child.try_wait() {
Ok(Some(status)) => return status.success(),
Ok(None) => {
if Instant::now() >= deadline {
let _ = child.kill();
return false;
}
std::thread::sleep(Duration::from_millis(50));
}
Err(_) => return false,
}
}
}
Err(_) => false,
}
}
fn sha256_bytes(bytes: &[u8]) -> Result<String, String> {
let work_dir = temp_work_dir("supermachine-json-sha")?;
let path = work_dir.join("blob");
let result = (|| {
std::fs::write(&path, bytes).map_err(|e| format!("write digest blob: {e}"))?;
sha256_file(&path)
})();
let _ = std::fs::remove_dir_all(work_dir);
result
}
fn epoch_ms() -> Result<u128, String> {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis())
.map_err(|e| format!("system time before Unix epoch: {e}"))
}
pub(crate) struct BakePlan<'a> {
image: &'a str,
name: Option<&'a str>,
runtime: &'a str,
guest_port: u16,
memory_mib: u32,
vcpus: u32,
pull_policy: &'a str,
snapshots_dir: &'a Path,
cmd_override: Option<&'a str>,
extra_args: &'a [String],
platform: &'a str,
}
impl<'a> BakePlan<'a> {
fn from_request(request: &'a BakeRequest) -> Self {
Self {
image: &request.image,
name: request.name.as_deref(),
runtime: &request.runtime,
guest_port: request.guest_port,
memory_mib: request.memory_mib,
vcpus: request.vcpus,
pull_policy: &request.pull_policy,
snapshots_dir: &request.snapshots_dir,
cmd_override: request.cmd_override.as_deref(),
extra_args: &request.extra_args,
platform: &request.platform,
}
}
fn arch(&self) -> &str {
self.platform.split('/').nth(1).unwrap_or("arm64")
}
fn snapshot_name(&self) -> String {
self.name
.map(ToOwned::to_owned)
.unwrap_or_else(|| sanitized_snapshot_name(self.image))
}
fn metadata_path(&self) -> PathBuf {
self.snapshots_dir
.join(self.snapshot_name())
.join("metadata.json")
}
fn command(&self, root: &Path) -> Result<Command, String> {
let push = root.join("tools/supermachine-push");
if !push.is_file() {
return Err(format!("missing image bake tool: {}", push.display()));
}
let mut cmd = Command::new(push);
cmd.arg(self.image)
.arg("--runtime")
.arg(self.runtime)
.arg("--port")
.arg(self.guest_port.to_string())
.arg("--memory")
.arg(self.memory_mib.to_string())
.arg("--pull")
.arg("never")
.env("SUPERMACHINE_SNAPSHOTS", self.snapshots_dir);
if let Some(name) = self.name {
cmd.arg("--name").arg(name);
}
cmd.args(self.extra_args);
Ok(cmd)
}
}
struct ImageResolution {
local_arch: Option<String>,
architecture: Option<String>,
image_id: Option<String>,
effective_cmd: Vec<String>,
working_dir: Option<String>,
user: Option<String>,
env: Vec<String>,
env_count: usize,
pull_action: String,
inspect_ms: u128,
}
struct LayerPlan {
cache_dir: PathBuf,
index_path: Option<PathBuf>,
layer_shas: Vec<String>,
save_work_dir: Option<PathBuf>,
save_dir: Option<PathBuf>,
cached_layers: usize,
missing_layers: usize,
manifest_cache_hit: bool,
plan_ms: u128,
}
struct LayerMaterialization {
materialize_ms: u128,
built_layers: usize,
reused_layers: usize,
}
struct DeltaMaterialization {
prepare_ms: u128,
materialize_ms: u128,
cache_hit: bool,
skipped: Option<String>,
key: Option<String>,
cache_path: Option<PathBuf>,
}
struct NativeBakeResult {
total_ms: u128,
timings: serde_json::Value,
reused: bool,
}
static TEMP_COUNTER: AtomicU64 = AtomicU64::new(0);
fn emit_image_resolution_trace(resolution: &ImageResolution) {
let cmd_json =
serde_json::to_string(&resolution.effective_cmd).unwrap_or_else(|_| "[]".to_owned());
eprintln!(
"supermachine: image resolved inspect_ms={} pull={} local_arch={} arch={} image_id={} env_count={} cmd={}",
resolution.inspect_ms,
resolution.pull_action,
resolution.local_arch.as_deref().unwrap_or(""),
resolution.architecture.as_deref().unwrap_or(""),
resolution
.image_id
.as_deref()
.map(|s| &s[..s.len().min(16)])
.unwrap_or(""),
resolution.env_count,
cmd_json
);
}
pub fn run_push(request: &BakeRequest, run_t0: Instant, root: &Path) -> Result<(), String> {
let _span = tracing::info_span!(
"supermachine.bake",
image = %request.image,
memory_mib = request.memory_mib,
vcpus = request.vcpus,
)
.entered();
#[cfg(target_os = "macos")]
{
let worker_bin = supermachine_worker_bin(root);
crate::codesign::verify_worker_version(&worker_bin)?;
}
let plan = BakePlan::from_request(request);
if trace_enabled() {
eprintln!(
"supermachine: bake plan image={} name={} runtime={} port={} memory={}MiB snapshots={} pull={}",
plan.image,
plan.snapshot_name(),
plan.runtime,
plan.guest_port,
plan.memory_mib,
plan.snapshots_dir.display(),
plan.pull_policy
);
}
if plan.runtime == "supermachine"
&& std::env::var("SUPERMACHINE_NATIVE_BAKE_TAIL")
.map(|v| v != "0" && v != "false")
.unwrap_or(true)
{
if let Some(result) = try_fast_cache_hit(&plan, root, run_t0)? {
if trace_enabled() {
eprintln!(
"supermachine: fast cache-hit after {}ms total={}ms",
result.total_ms,
elapsed_ms(run_t0)
);
eprintln!("supermachine: bake timings {}", result.timings);
}
return Ok(());
}
if let Some(result) = try_sibling_clone_fast_path(&plan, root, run_t0)? {
if trace_enabled() {
eprintln!(
"supermachine: sibling-clone fast path after {}ms total={}ms",
result.total_ms,
elapsed_ms(run_t0)
);
eprintln!("supermachine: bake timings {}", result.timings);
}
return Ok(());
}
}
let _name_lock = acquire_bake_name_lock(&plan, run_t0)?;
if plan.runtime == "supermachine"
&& std::env::var("SUPERMACHINE_NATIVE_BAKE_TAIL")
.map(|v| v != "0" && v != "false")
.unwrap_or(true)
{
if try_fast_cache_hit(&plan, root, run_t0)?.is_some() {
if trace_enabled() {
eprintln!(
"supermachine: snapshot {} produced by a concurrent baker; reusing (total={}ms)",
plan.snapshot_name(),
elapsed_ms(run_t0)
);
}
return Ok(());
}
if try_sibling_clone_fast_path(&plan, root, run_t0)?.is_some() {
return Ok(());
}
}
let _bake_slot = acquire_bake_slot(run_t0)?;
let source = select_image_source(plan.image, plan.arch())?;
let resolution = resolve_image(&plan, source.as_ref())?;
if trace_enabled() {
emit_image_resolution_trace(&resolution);
}
if plan.runtime == "supermachine"
&& std::env::var("SUPERMACHINE_NATIVE_BAKE_TAIL")
.map(|v| v != "0" && v != "false")
.unwrap_or(true)
{
if let Some(result) =
native_supermachine_early_reuse_snapshot(&plan, &resolution, root, run_t0)?
{
if trace_enabled() {
eprintln!(
"supermachine: native bake reused snapshot after {}ms total={}ms",
result.total_ms,
elapsed_ms(run_t0)
);
eprintln!("supermachine: bake timings {}", result.timings);
eprintln!(
"supermachine: push finished after {}ms total={}ms",
result.total_ms,
elapsed_ms(run_t0)
);
}
return Ok(());
}
}
let layer_plan = plan_layers(&plan, &resolution, source.as_ref())?;
let layer_materialization_result = match layer_plan.as_ref() {
Some(layer_plan) => {
materialize_missing_layers(plan.image, layer_plan, source.as_ref()).map(Some)
}
None => Ok(None),
};
if let Some(work_dir) = layer_plan
.as_ref()
.and_then(|layer_plan| layer_plan.save_work_dir.as_deref())
{
let _ = std::fs::remove_dir_all(work_dir);
}
let layer_materialization = layer_materialization_result?;
let delta_materialization = materialize_delta_cache(&plan, &resolution, root)?;
if trace_enabled() {
if let Some(layer_plan) = layer_plan.as_ref() {
let first_layer = layer_plan
.layer_shas
.first()
.map(|s| &s[..s.len().min(16)])
.unwrap_or("");
eprintln!(
"supermachine: layer plan plan_ms={} layers={} cached={} missing={} manifest_cache_hit={} cache={} index={} first_layer={}",
layer_plan.plan_ms,
layer_plan.layer_shas.len(),
layer_plan.cached_layers,
layer_plan.missing_layers,
layer_plan.manifest_cache_hit,
layer_plan.cache_dir.display(),
layer_plan
.index_path
.as_deref()
.map(|p| p.display().to_string())
.unwrap_or_default(),
first_layer
);
}
if let Some(materialization) = layer_materialization.as_ref() {
eprintln!(
"supermachine: layer materialize materialize_ms={} built={} reused={}",
materialization.materialize_ms,
materialization.built_layers,
materialization.reused_layers
);
}
if let Some(delta) = delta_materialization.as_ref() {
eprintln!(
"supermachine: delta materialize prepare_ms={} materialize_ms={} cache_hit={} skipped={} key={} cache={}",
delta.prepare_ms,
delta.materialize_ms,
delta.cache_hit,
delta.skipped.as_deref().unwrap_or(""),
delta
.key
.as_deref()
.map(|key| &key[..key.len().min(16)])
.unwrap_or(""),
delta
.cache_path
.as_deref()
.map(|path| path.display().to_string())
.unwrap_or_default()
);
}
}
if plan.runtime == "supermachine"
&& std::env::var("SUPERMACHINE_NATIVE_BAKE_TAIL")
.map(|v| v != "0" && v != "false")
.unwrap_or(true)
{
let layer_plan = layer_plan
.as_ref()
.ok_or_else(|| "missing layer plan".to_owned())?;
let delta = delta_materialization
.as_ref()
.ok_or_else(|| "missing delta cache".to_owned())?;
let (native_bake_key, native_bake_inputs) =
native_supermachine_bake_key(&plan, &resolution, layer_plan, delta, root)?;
if let Some(result) = native_supermachine_reuse_snapshot(&plan, &native_bake_key, run_t0)? {
if trace_enabled() {
eprintln!(
"supermachine: native bake reused snapshot after {}ms total={}ms",
result.total_ms,
elapsed_ms(run_t0)
);
eprintln!("supermachine: bake timings {}", result.timings);
eprintln!(
"supermachine: push finished after {}ms total={}ms",
result.total_ms,
elapsed_ms(run_t0)
);
}
return Ok(());
}
let native_t0 = Instant::now();
let result = run_native_supermachine_bake(
&plan,
&resolution,
layer_plan,
delta,
root,
&native_bake_key,
&native_bake_inputs,
)?;
if trace_enabled() {
if result.reused {
eprintln!(
"supermachine: native bake reused snapshot after {}ms total={}ms",
result.total_ms,
elapsed_ms(run_t0)
);
} else {
eprintln!(
"supermachine: native bake finished after {}ms total={}ms",
result.total_ms,
elapsed_ms(run_t0)
);
}
eprintln!("supermachine: bake timings {}", result.timings);
}
if trace_enabled() {
eprintln!(
"supermachine: push finished after {}ms total={}ms",
elapsed_ms(native_t0),
elapsed_ms(run_t0)
);
}
return Ok(());
}
let mut cmd = plan.command(root)?;
let push_t0 = Instant::now();
let status = cmd
.status()
.map_err(|e| format!("supermachine-push: {e}"))?;
let push_ms = elapsed_ms(push_t0);
if trace_enabled() {
eprintln!(
"supermachine: push finished after {}ms total={}ms",
push_ms,
elapsed_ms(run_t0)
);
}
if status.success() {
emit_bake_timing_metadata(&plan);
Ok(())
} else {
Err(format!(
"supermachine-push failed with exit code {}",
status.code().unwrap_or(1)
))
}
}
fn home_join(path: &str) -> PathBuf {
std::env::var_os("HOME")
.map(PathBuf::from)
.unwrap_or_else(|| PathBuf::from("."))
.join(path)
}
struct BakeSlotGuard {
_file: Option<std::fs::File>,
}
struct BakeNameLock {
_file: Option<std::fs::File>,
}
fn acquire_bake_name_lock(plan: &BakePlan<'_>, run_t0: Instant) -> Result<BakeNameLock, String> {
use std::os::fd::AsRawFd;
let dir = plan.snapshots_dir;
std::fs::create_dir_all(dir)
.map_err(|e| format!("mkdir snapshots dir {}: {e}", dir.display()))?;
let lock_path = dir.join(format!(".{}.bakelock", plan.snapshot_name()));
let file = std::fs::OpenOptions::new()
.create(true)
.read(true)
.write(true)
.truncate(false)
.open(&lock_path)
.map_err(|e| format!("open bake name-lock {}: {e}", lock_path.display()))?;
let deadline = Instant::now() + Duration::from_secs(300);
let mut waited = false;
loop {
let r = unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_EX | libc::LOCK_NB) };
if r == 0 {
if waited && trace_enabled() {
eprintln!(
"supermachine: bake name-lock for {} acquired (wait={}ms)",
plan.snapshot_name(),
elapsed_ms(run_t0)
);
}
return Ok(BakeNameLock { _file: Some(file) });
}
if !waited {
if trace_enabled() {
eprintln!(
"supermachine: another process is baking {}; waiting for the bake lock",
plan.snapshot_name()
);
}
waited = true;
}
if Instant::now() > deadline {
return Err(format!(
"bake name-lock timeout after 5 min for {}; a same-name bake \
elsewhere may be wedged",
plan.snapshot_name()
));
}
std::thread::sleep(Duration::from_millis(50));
}
}
fn bake_concurrency_limit() -> usize {
if let Ok(s) = std::env::var("SUPERMACHINE_CONCURRENT_BAKES") {
if let Ok(n) = s.parse::<usize>() {
return n;
}
}
0
}
fn sysinfo_total_ram_bytes() -> u64 {
#[cfg(target_os = "macos")]
{
let mut size: u64 = 0;
let mut len = std::mem::size_of::<u64>();
let name = c"hw.memsize";
let r = unsafe {
libc::sysctlbyname(
name.as_ptr(),
&mut size as *mut _ as *mut libc::c_void,
&mut len,
std::ptr::null_mut(),
0,
)
};
if r == 0 && size > 0 {
return size;
}
}
16 * 1024 * 1024 * 1024
}
fn acquire_bake_slot(run_t0: Instant) -> Result<BakeSlotGuard, String> {
use std::os::fd::AsRawFd;
let n = bake_concurrency_limit();
if n == 0 {
return Ok(BakeSlotGuard { _file: None });
}
let dir = home_join(".local/supermachine-bake-slots");
std::fs::create_dir_all(&dir)
.map_err(|e| format!("mkdir bake-slots {}: {e}", dir.display()))?;
let start = std::process::id() as usize % n;
let deadline = Instant::now() + Duration::from_secs(300);
let mut waited_first = false;
loop {
for offset in 0..n {
let i = (start + offset) % n;
let slot = dir.join(format!("slot.{i}"));
let file = match std::fs::OpenOptions::new()
.create(true)
.read(true)
.write(true)
.truncate(false)
.open(&slot)
{
Ok(f) => f,
Err(e) => {
return Err(format!("open bake slot {}: {e}", slot.display()));
}
};
let r = unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_EX | libc::LOCK_NB) };
if r == 0 {
if waited_first && trace_enabled() {
eprintln!(
"supermachine: bake slot acquired (slot={}, wait={}ms)",
i,
elapsed_ms(run_t0)
);
}
return Ok(BakeSlotGuard { _file: Some(file) });
}
drop(file);
}
if !waited_first {
if trace_enabled() {
eprintln!(
"supermachine: all {n} bake slots busy, waiting (set SUPERMACHINE_CONCURRENT_BAKES=N to tune)"
);
}
waited_first = true;
}
if Instant::now() > deadline {
return Err(format!(
"bake-slot acquire timeout after 5 min ({n} slots, all held). \
A bake elsewhere may be wedged; consider \
SUPERMACHINE_CONCURRENT_BAKES=0 to disable the throttle."
));
}
std::thread::sleep(Duration::from_millis(50));
}
}
fn layer_cache_dir() -> PathBuf {
std::env::var_os("SUPERMACHINE_LAYER_CACHE")
.map(PathBuf::from)
.unwrap_or_else(|| home_join(".local/supermachine-layer-cache"))
}
fn volumes_dir() -> PathBuf {
std::env::var_os("SUPERMACHINE_VOLUMES")
.map(PathBuf::from)
.unwrap_or_else(|| home_join(".local/supermachine/volumes"))
}
#[derive(Debug, Clone)]
pub(crate) struct VolumeMapping {
pub host_file: PathBuf,
pub guest_path: String,
#[allow(dead_code)] pub size_bytes: u64,
}
pub(crate) fn parse_volume_args(extra_args: &[String]) -> Result<Vec<VolumeMapping>, String> {
let mut out = Vec::new();
for raw in arg_values(extra_args, "--volume") {
let parts: Vec<&str> = raw.splitn(3, ':').collect();
if parts.len() < 2 {
return Err(format!(
"--volume expects HOST:GUEST[:SIZE_BYTES], got {raw:?}"
));
}
let host = parts[0];
let guest = parts[1];
let size_bytes = if let Some(s) = parts.get(2) {
s.parse::<u64>()
.map_err(|_| format!("--volume SIZE_BYTES not a u64: {s:?}"))?
} else {
crate::vmm::resources::VolumeSpec::DEFAULT_SIZE_BYTES
};
if host.is_empty() || guest.is_empty() {
return Err(format!("--volume HOST:GUEST has empty side: {raw:?}"));
}
if !guest.starts_with('/') {
return Err(format!("--volume guest path must be absolute: {guest:?}"));
}
if guest.contains('\n') || guest.contains('\r') {
return Err(format!("--volume guest path contains newline: {guest:?}"));
}
let host_file = if host.contains('/') || host.starts_with('.') {
PathBuf::from(host)
} else {
if host.chars().any(|c| matches!(c, '/' | '\\' | ':' | '\0')) {
return Err(format!("--volume name {host:?} contains forbidden chars"));
}
volumes_dir().join(format!("{host}.img"))
};
out.push(VolumeMapping {
host_file,
guest_path: guest.to_owned(),
size_bytes,
});
}
Ok(out)
}
fn locate_mke2fs() -> Option<PathBuf> {
for candidate in ["mkfs.ext4", "mke2fs", "/usr/sbin/mkfs.ext4"] {
if let Ok(out) = Command::new("which").arg(candidate).output() {
if out.status.success() {
let p = String::from_utf8_lossy(&out.stdout).trim().to_owned();
if !p.is_empty() {
return Some(PathBuf::from(p));
}
}
}
}
let android = std::env::var_os("HOME")
.map(|h| PathBuf::from(h).join("Library/Android/sdk/platform-tools/mke2fs"))
.filter(|p| p.is_file());
if android.is_some() {
return android;
}
None
}
pub(crate) fn ensure_volume_host_file(
mapping: &VolumeMapping,
size_bytes: u64,
) -> Result<(), String> {
let path = &mapping.host_file;
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)
.map_err(|e| format!("create volume dir {}: {e}", parent.display()))?;
}
let already_existed = path.is_file();
if already_existed {
let len = std::fs::metadata(path)
.map_err(|e| format!("stat {}: {e}", path.display()))?
.len();
if len >= size_bytes {
return Ok(());
}
return Err(format!(
"volume {} exists at {len} bytes, smaller than requested {size_bytes}; \
remove it or pass a smaller size",
path.display()
));
}
let f = std::fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false) .open(path)
.map_err(|e| format!("create {}: {e}", path.display()))?;
f.set_len(size_bytes)
.map_err(|e| format!("truncate {}: {e}", path.display()))?;
drop(f);
let mke2fs = locate_mke2fs().ok_or_else(|| {
format!(
"no `mke2fs` / `mkfs.ext4` on PATH; install e2fsprogs (\
`brew install e2fsprogs`) so volume {} can be formatted",
path.display()
)
})?;
let mut cmd = Command::new(&mke2fs);
cmd.arg("-t")
.arg("ext4")
.arg("-F")
.arg("-L")
.arg("supermachine-vol")
.arg(path)
.stdout(Stdio::null())
.stderr(Stdio::piped());
let out = cmd
.output()
.map_err(|e| format!("spawn {}: {e}", mke2fs.display()))?;
if !out.status.success() {
let _ = std::fs::remove_file(path);
return Err(format!(
"{} {}: exit {:?}: {}",
mke2fs.display(),
path.display(),
out.status.code(),
String::from_utf8_lossy(&out.stderr).trim()
));
}
Ok(())
}
fn trace_enabled() -> bool {
crate::trace::enabled("run")
}
fn elapsed_ms(t0: Instant) -> u128 {
t0.elapsed().as_millis()
}
pub fn snapshot_name_for_image(image: &str) -> String {
sanitized_snapshot_name(image)
}
fn sanitized_snapshot_name(image: &str) -> String {
image
.chars()
.map(|c| if matches!(c, ':' | '/' | '.') { '_' } else { c })
.take(60)
.collect()
}
fn emit_bake_timing_metadata(plan: &BakePlan<'_>) {
if !trace_enabled() {
return;
}
let meta_path = plan.metadata_path();
let Ok(text) = std::fs::read_to_string(&meta_path) else {
return;
};
let Ok(json) = serde_json::from_str::<serde_json::Value>(&text) else {
return;
};
if let Some(timings) = json.get("timings") {
eprintln!("supermachine: bake timings {timings}");
}
}
fn command_output(mut cmd: Command, label: &str) -> Result<String, String> {
let output = cmd.output().map_err(|e| format!("{label}: {e}"))?;
if output.status.success() {
return String::from_utf8(output.stdout)
.map_err(|e| format!("{label}: non-utf8 stdout: {e}"));
}
let stderr = String::from_utf8_lossy(&output.stderr);
let stdout = String::from_utf8_lossy(&output.stdout);
let detail = stderr
.lines()
.find(|line| !line.trim().is_empty())
.or_else(|| stdout.lines().find(|line| !line.trim().is_empty()))
.unwrap_or("command failed");
Err(format!("{label}: {detail}"))
}
fn run_status(mut cmd: Command, label: &str) -> Result<(), String> {
let status = cmd.status().map_err(|e| format!("{label}: {e}"))?;
if status.success() {
Ok(())
} else {
Err(format!(
"{label} failed with exit code {}",
status.code().unwrap_or(1)
))
}
}
fn spawn_status(mut cmd: Command, label: &str) -> Result<std::process::ExitStatus, String> {
cmd.status().map_err(|e| format!("{label}: {e}"))
}
fn value_string_array(value: Option<&serde_json::Value>) -> Vec<String> {
match value {
Some(serde_json::Value::Array(items)) => items
.iter()
.filter_map(|v| v.as_str().map(ToOwned::to_owned))
.collect(),
Some(serde_json::Value::String(s)) if !s.is_empty() => vec![s.to_owned()],
_ => Vec::new(),
}
}
#[cfg_attr(
not(all(target_os = "linux", target_arch = "x86_64")),
allow(dead_code)
)]
fn sh_single_quote(s: &str) -> String {
let mut out = String::with_capacity(s.len() + 2);
out.push('\'');
for c in s.chars() {
if c == '\'' {
out.push_str("'\\''");
} else {
out.push(c);
}
}
out.push('\'');
out
}
#[cfg_attr(
not(all(target_os = "linux", target_arch = "x86_64")),
allow(dead_code)
)]
pub(crate) fn kvm_workload_script(
image: &str,
layout: &Path,
arch: &str,
) -> Result<Option<String>, String> {
let info = oci::inspect_oci_layout(image, layout, arch)?;
let cfg = info.get("Config");
let cfg = match cfg {
Some(c) => c,
None => return Ok(None),
};
let mut argv = value_string_array(cfg.get("Entrypoint"));
argv.extend(value_string_array(cfg.get("Cmd")));
if argv.is_empty() {
return Ok(None);
}
let env = value_string_array(cfg.get("Env"));
let workdir = cfg
.get("WorkingDir")
.and_then(|v| v.as_str())
.filter(|s| !s.is_empty());
let mut script = String::from("#!/.supermachine/busybox sh\n");
for kv in &env {
if let Some(eq) = kv.find('=') {
let (k, v) = (&kv[..eq], &kv[eq + 1..]);
script.push_str(&format!("export {k}={}\n", sh_single_quote(v)));
}
}
if let Some(dir) = workdir {
script.push_str(&format!(
"cd {} 2>/dev/null || true\n",
sh_single_quote(dir)
));
}
script.push_str("exec");
for a in &argv {
script.push(' ');
script.push_str(&sh_single_quote(a));
}
script.push('\n');
Ok(Some(script))
}
fn temp_work_dir(prefix: &str) -> Result<PathBuf, String> {
let n = TEMP_COUNTER.fetch_add(1, Ordering::Relaxed);
let path = std::env::temp_dir().join(format!("{prefix}-{}-{n}", std::process::id()));
std::fs::create_dir_all(&path)
.map_err(|e| format!("create temp dir {}: {e}", path.display()))?;
Ok(path)
}
fn strip_sha256(s: &str) -> String {
s.strip_prefix("sha256:").unwrap_or(s).to_owned()
}
fn sha256_path_component(digest: &str) -> Result<String, String> {
let sha = strip_sha256(digest);
if sha.is_empty() || sha.contains('/') || sha.contains("..") {
return Err(format!("unsafe OCI digest (path traversal): {digest:?}"));
}
Ok(sha)
}
fn confined_join(root: &Path, rel: &str) -> Result<PathBuf, String> {
let p = Path::new(rel);
let escapes = p.is_absolute()
|| p.components().any(|c| {
matches!(
c,
std::path::Component::ParentDir | std::path::Component::RootDir
)
});
if escapes {
return Err(format!("unsafe manifest path (traversal): {rel:?}"));
}
Ok(root.join(p))
}
fn parse_mount_arg(raw: &str) -> Option<serde_json::Value> {
let parts: Vec<&str> = raw.splitn(4, ':').collect();
let (host_path, guest_tag, guest_path, symlinks) = match parts.len() {
3 => (parts[0], parts[1], parts[2], None),
4 => (parts[0], parts[1], parts[2], Some(parts[3])),
_ => return None,
};
if guest_tag.is_empty() || guest_path.is_empty() {
return None;
}
let mut obj = serde_json::Map::new();
obj.insert("host_path".to_owned(), host_path.into());
obj.insert("guest_tag".to_owned(), guest_tag.into());
obj.insert("guest_path".to_owned(), guest_path.into());
if let Some(p) = symlinks {
obj.insert("symlinks".to_owned(), p.into());
}
Some(serde_json::Value::Object(obj))
}
fn arg_values<'a>(args: &'a [String], flag: &str) -> Vec<&'a str> {
let mut values = Vec::new();
let mut i = 0;
while i < args.len() {
if args[i] == flag && i + 1 < args.len() {
values.push(args[i + 1].as_str());
i += 2;
} else {
i += 1;
}
}
values
}
fn arg_present(args: &[String], flag: &str) -> bool {
args.iter().any(|arg| arg == flag)
}
fn arg_value<'a>(args: &'a [String], flag: &str) -> Option<&'a str> {
arg_values(args, flag).into_iter().last()
}
fn sha256_file(path: &Path) -> Result<String, String> {
use ring::digest::{Context, SHA256};
use std::collections::HashMap;
use std::io::Read;
use std::sync::RwLock;
static CACHE: OnceLock<RwLock<HashMap<(PathBuf, u64, u128), String>>> = OnceLock::new();
let canon = path
.canonicalize()
.map_err(|e| format!("canonicalize {}: {e}", path.display()))?;
let meta = std::fs::metadata(&canon).map_err(|e| format!("stat {}: {e}", canon.display()))?;
let len = meta.len();
let mtime_ns = meta
.modified()
.map_err(|e| format!("mtime {}: {e}", canon.display()))?
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0);
let key = (canon.clone(), len, mtime_ns);
let cache = CACHE.get_or_init(|| RwLock::new(HashMap::new()));
if let Ok(g) = cache.read() {
if let Some(hit) = g.get(&key) {
return Ok(hit.clone());
}
}
let mut f =
std::fs::File::open(&canon).map_err(|e| format!("open {}: {e}", canon.display()))?;
let mut ctx = Context::new(&SHA256);
let mut buf = [0u8; 64 * 1024];
loop {
let n = f
.read(&mut buf)
.map_err(|e| format!("read {}: {e}", canon.display()))?;
if n == 0 {
break;
}
ctx.update(&buf[..n]);
}
let digest = ctx.finish();
let mut hex = String::with_capacity(64);
for b in digest.as_ref() {
use std::fmt::Write;
let _ = write!(hex, "{:02x}", b);
}
if let Ok(mut g) = cache.write() {
if g.len() >= 1024 {
if let Some(k) = g.keys().next().cloned() {
g.remove(&k);
}
}
g.insert(key, hex.clone());
}
Ok(hex)
}
fn sha256_text(text: &str) -> Result<String, String> {
use ring::digest::{digest, SHA256};
let d = digest(&SHA256, text.as_bytes());
let mut hex = String::with_capacity(64);
for b in d.as_ref() {
use std::fmt::Write;
let _ = write!(hex, "{:02x}", b);
}
Ok(hex)
}
fn generate_tsi_token_hex() -> Result<String, String> {
use ring::rand::SecureRandom;
let rng = ring::rand::SystemRandom::new();
let mut buf = [0u8; 32];
rng.fill(&mut buf)
.map_err(|_| "ring::rand::SystemRandom::fill failed".to_owned())?;
let mut out = String::with_capacity(64);
for b in &buf {
use std::fmt::Write;
let _ = write!(out, "{:02x}", b);
}
Ok(out)
}
fn file_mode_octal(path: &Path) -> Result<String, String> {
let mode = std::fs::metadata(path)
.map_err(|e| format!("stat {}: {e}", path.display()))?
.permissions()
.mode()
& 0o7777;
Ok(format!("{mode:o}"))
}
fn file_size_mtime(path: &Path) -> Result<String, String> {
let meta = std::fs::metadata(path).map_err(|e| format!("stat {}: {e}", path.display()))?;
let mtime = meta
.modified()
.map_err(|e| format!("mtime {}: {e}", path.display()))?
.duration_since(UNIX_EPOCH)
.map_err(|e| format!("mtime before epoch {}: {e}", path.display()))?
.as_secs();
Ok(format!("{}:{mtime}", meta.len()))
}
fn set_mode(path: &Path, mode: u32) -> Result<(), String> {
let mut permissions = std::fs::metadata(path)
.map_err(|e| format!("stat {}: {e}", path.display()))?
.permissions();
permissions.set_mode(mode);
std::fs::set_permissions(path, permissions)
.map_err(|e| format!("chmod {:o} {}: {e}", mode, path.display()))
}
fn write_lines(path: &Path, lines: &[String]) -> Result<(), String> {
let mut text = String::new();
for line in lines {
text.push_str(line);
text.push('\n');
}
std::fs::write(path, text).map_err(|e| format!("write {}: {e}", path.display()))
}
fn kernel_boot_cache_dir() -> PathBuf {
if let Some(d) = std::env::var_os("SUPERMACHINE_KERNEL_BOOT_CACHE_DIR") {
return PathBuf::from(d);
}
let home = std::env::var_os("HOME")
.map(PathBuf::from)
.unwrap_or_else(|| PathBuf::from("."));
home.join(".local/share/supermachine/kernel-boot-cache")
}
pub(crate) fn kernel_boot_cache_key(plan: &BakePlan<'_>, root: &Path) -> Result<String, String> {
let kernel = supermachine_kernel(root);
let init_oci = ensure_init_oci(root)?;
let worker_bin = supermachine_worker_bin(root);
let parts = format!(
"kernel={}\ninit_oci={}\nworker_bin={}\nmemory_mib={}\nvcpus={}\n",
file_size_mtime(&kernel)?,
file_size_mtime(&init_oci)?,
file_size_mtime(&worker_bin)?,
plan.memory_mib,
plan.vcpus,
);
sha256_text(&parts)
}
pub(crate) fn kernel_boot_cache_snap_path(
plan: &BakePlan<'_>,
root: &Path,
) -> Result<PathBuf, String> {
let key = kernel_boot_cache_key(plan, root)?;
let dir = kernel_boot_cache_dir();
Ok(dir.join(format!("{key}_mib{}.snap", plan.memory_mib)))
}
#[allow(dead_code)] pub(crate) fn kernel_boot_cache_key_path(
plan: &BakePlan<'_>,
root: &Path,
) -> Result<PathBuf, String> {
let key = kernel_boot_cache_key(plan, root)?;
let dir = kernel_boot_cache_dir();
Ok(dir.join(format!("{key}_mib{}.cache-key", plan.memory_mib)))
}
fn ensure_init_oci(root: &Path) -> Result<PathBuf, String> {
let dir = [
root.join("crates/supermachine/oci"),
root.join("share/supermachine/oci"),
root.join("crates/supermachine/oci"),
]
.into_iter()
.find(|p| p.is_dir())
.unwrap_or_else(|| root.join("crates/supermachine/oci"));
let src = dir.join("init-oci.c");
let bin = dir.join("init-oci");
if bin.is_file() {
let executable = std::fs::metadata(&bin)
.ok()
.map(|m| m.permissions().mode() & 0o111 != 0)
.unwrap_or(false);
let src_meta = std::fs::metadata(&src).ok();
let up_to_date = match src_meta {
Some(s) => std::fs::metadata(&bin)
.ok()
.and_then(|b| Some((b.modified().ok()?, s.modified().ok()?)))
.map(|(b, s)| b >= s)
.unwrap_or(false),
None => true, };
if executable && up_to_date {
return Ok(bin);
}
}
if !src.is_file() {
let assets = crate::assets::AssetPaths::discover();
if let Some(p) = assets.init_oci_bin {
return Ok(p);
}
}
if src.is_file() {
let mut cmd = Command::new("zig");
cmd.current_dir(&dir)
.arg("cc")
.arg("--target=aarch64-linux-musl")
.arg("-static")
.arg("-O2")
.arg("-o")
.arg("init-oci")
.arg("init-oci.c")
.stdout(Stdio::null())
.stderr(Stdio::null());
run_status(cmd, "build init-oci")?;
if bin.is_file() {
return Ok(bin);
}
}
Err(format!(
"missing init-oci: not found in dev tree ({}), bundled assets, or via zig build",
bin.display()
))
}
fn ensure_supermachine_agent(root: &Path) -> Result<PathBuf, String> {
let installed = root.join("share/supermachine/supermachine-agent");
if installed.is_file() {
return Ok(installed);
}
let musl_target = match std::env::consts::ARCH {
"aarch64" => "aarch64-unknown-linux-musl",
"x86_64" => "x86_64-unknown-linux-musl",
other => return Err(format!("supermachine-agent: unsupported host arch {other}")),
};
let crate_dir = root.join("crates/supermachine-guest-agent");
let dev_bin = crate_dir.join(format!("target/{musl_target}/release/supermachine-agent"));
if dev_bin.is_file() {
let src = crate_dir.join("src/main.rs");
let up_to_date = match (
std::fs::metadata(&dev_bin).ok(),
std::fs::metadata(&src).ok(),
) {
(Some(b), Some(s)) => b.modified().ok() >= s.modified().ok(),
(Some(_), None) => true,
_ => false,
};
if up_to_date {
return Ok(dev_bin);
}
}
if !crate_dir.is_dir() {
let assets = crate::assets::AssetPaths::discover();
if let Some(p) = assets.supermachine_agent {
return Ok(p);
}
return Err(format!(
"supermachine-agent: not found at {} (release tarball), \
{} (dev tree), or in the bundled assets from supermachine-kernel",
installed.display(),
dev_bin.display()
));
}
let path = std::env::var("PATH").unwrap_or_default();
let new_path = format!("{}:{}", crate_dir.display(), path);
let mut cmd = Command::new("cargo");
cmd.current_dir(&crate_dir)
.arg("build")
.arg("--release")
.arg(format!("--target={musl_target}"))
.env("PATH", new_path)
.stdout(Stdio::null())
.stderr(Stdio::piped());
run_status(cmd, "build supermachine-agent")?;
if !dev_bin.is_file() {
return Err(format!(
"supermachine-agent build did not produce {}",
dev_bin.display()
));
}
Ok(dev_bin)
}
fn build_initramfs(root: &Path, out_dir: &Path) -> Result<(PathBuf, u128), String> {
let t0 = Instant::now();
let init = ensure_init_oci(root)?;
let init_dir = init
.parent()
.map(|p| p.to_path_buf())
.unwrap_or_else(|| root.join("crates/supermachine/oci"));
let cache = init_dir.join("init-oci.cpio.gz");
let smpark_ko_src = {
let dev_tree = init_dir.join("supermachine-smpark.ko");
if dev_tree.is_file() {
dev_tree
} else {
crate::assets::AssetPaths::discover()
.smpark_ko
.unwrap_or(dev_tree)
}
};
let smpark_present = smpark_ko_src.is_file();
let rebuild = match (std::fs::metadata(&cache), std::fs::metadata(&init)) {
(Ok(cache_meta), Ok(init_meta)) => {
if cache_meta.len() == 0 {
true
} else {
let init_newer = cache_meta.modified().ok() < init_meta.modified().ok();
let smpark_newer = if smpark_present {
std::fs::metadata(&smpark_ko_src)
.ok()
.and_then(|m| m.modified().ok())
.map(|smt| {
cache_meta
.modified()
.ok()
.map(|ct| ct < smt)
.unwrap_or(true)
})
.unwrap_or(false)
} else {
false
};
init_newer || smpark_newer
}
}
_ => true,
};
if rebuild {
let stage = temp_work_dir("supermachine-initramfs-stage")?;
let tmp_seq = TEMP_COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let cache_tmp = init_dir.join(format!(
".init-oci.cpio.gz.{}.{}.tmp",
std::process::id(),
tmp_seq
));
let _ = std::fs::remove_file(&cache_tmp);
let result: Result<(), String> = (|| {
std::fs::copy(&init, stage.join("init"))
.map_err(|e| format!("copy init into initramfs stage: {e}"))?;
set_mode(&stage.join("init"), 0o755)?;
for dir in ["proc", "sys", "dev", "newroot"] {
std::fs::create_dir_all(stage.join(dir))
.map_err(|e| format!("create initramfs dir {dir}: {e}"))?;
}
if smpark_present {
std::fs::copy(&smpark_ko_src, stage.join("supermachine-smpark.ko")).map_err(
|e| format!("copy supermachine-smpark.ko into initramfs stage: {e}"),
)?;
set_mode(&stage.join("supermachine-smpark.ko"), 0o644)?;
}
let script =
"cd \"$1\" && find . -mindepth 1 -print | sed 's|^\\./||' | cpio -o -H newc 2>/dev/null | gzip > \"$2\"";
let mut cmd = Command::new("sh");
cmd.arg("-c")
.arg(script)
.arg("sh")
.arg(&stage)
.arg(&cache_tmp)
.stdout(Stdio::null());
run_status(cmd, "build initramfs cpio")?;
std::fs::rename(&cache_tmp, &cache).map_err(|e| {
let _ = std::fs::remove_file(&cache_tmp);
format!(
"rename initramfs cpio {} -> {}: {e}",
cache_tmp.display(),
cache.display()
)
})?;
Ok(())
})();
let _ = std::fs::remove_dir_all(&stage);
let _ = std::fs::remove_file(&cache_tmp);
result?;
}
let out = out_dir.join("init.cpio.gz");
let _ = std::fs::remove_file(&out);
if std::fs::hard_link(&cache, &out).is_err() {
std::fs::copy(&cache, &out).map_err(|e| {
format!(
"copy initramfs cache {} -> {}: {e}",
cache.display(),
out.display()
)
})?;
}
Ok((out, elapsed_ms(t0)))
}
fn write_env_json(
plan: &BakePlan<'_>,
resolution: &ImageResolution,
out_dir: &Path,
) -> Result<PathBuf, String> {
let json = env_json_value(plan, resolution);
let path = out_dir.join("env.json");
std::fs::write(
&path,
serde_json::to_vec_pretty(&json).map_err(|e| format!("encode env.json: {e}"))?,
)
.map_err(|e| format!("write {}: {e}", path.display()))?;
Ok(path)
}
fn env_json_value(plan: &BakePlan<'_>, resolution: &ImageResolution) -> serde_json::Value {
let mut env = serde_json::Map::new();
for line in &resolution.env {
if let Some((k, v)) = line.split_once('=') {
env.insert(k.to_owned(), serde_json::Value::String(v.to_owned()));
}
}
for line in arg_values(plan.extra_args, "--env") {
if let Some((k, v)) = line.split_once('=') {
env.insert(k.to_owned(), serde_json::Value::String(v.to_owned()));
}
}
serde_json::json!({ "env": env, "secrets": {} })
}
fn read_to_string_lossy(path: &Path) -> String {
std::fs::read(path)
.map(|bytes| String::from_utf8_lossy(&bytes).into_owned())
.unwrap_or_default()
}
fn log_tail_get_text(path: &Path) -> String {
use std::collections::HashMap;
use std::io::{Read, Seek, SeekFrom};
use std::sync::Mutex;
static TAILS: OnceLock<Mutex<HashMap<PathBuf, (u64, String)>>> = OnceLock::new();
const MAX_BUF: usize = 1024 * 1024;
let canon = path.to_path_buf();
let tails = TAILS.get_or_init(|| Mutex::new(HashMap::new()));
let mut g = match tails.lock() {
Ok(g) => g,
Err(_) => return read_to_string_lossy(path),
};
const MAX_ENTRIES: usize = 64;
if g.len() >= MAX_ENTRIES && !g.contains_key(&canon) {
if let Some(k) = g.keys().next().cloned() {
g.remove(&k);
}
}
let entry = g.entry(canon).or_insert_with(|| (0u64, String::new()));
let cur_len = match std::fs::metadata(path) {
Ok(m) => m.len(),
Err(_) => return entry.1.clone(),
};
if cur_len < entry.0 {
entry.0 = 0;
entry.1.clear();
}
if cur_len > entry.0 {
match std::fs::File::open(path) {
Ok(mut f) => {
if f.seek(SeekFrom::Start(entry.0)).is_ok() {
let mut new_bytes: Vec<u8> = Vec::new();
if f.read_to_end(&mut new_bytes).is_ok() {
entry.0 = cur_len;
entry.1.push_str(&String::from_utf8_lossy(&new_bytes));
if entry.1.len() > MAX_BUF {
let drop_to = entry.1.len() - MAX_BUF / 2;
entry.1 = entry.1.split_off(drop_to);
}
}
}
}
Err(_) => return entry.1.clone(),
}
}
entry.1.clone()
}
fn log_has(path: &Path, needle: &str) -> bool {
log_tail_get_text(path).contains(needle)
}
fn log_has_failure(path: &Path) -> bool {
let text = log_tail_get_text(path);
text.lines().any(|line| {
line.starts_with("FATAL:")
|| line.starts_with("error:")
|| line.contains("panic")
|| line.contains("snapshot") && line.contains("ERR")
})
}
const KERNEL_PANIC_PATTERNS: &[&str] = &[
"Kernel panic",
"Unable to handle kernel NULL pointer dereference",
"Unable to handle kernel paging request",
"Internal error: Oops",
"Attempted to kill the idle task",
];
fn detect_kernel_panic(path: &Path) -> Option<(String, Vec<String>)> {
let text = log_tail_get_text(path);
let lines: Vec<&str> = text.lines().collect();
for (idx, line) in lines.iter().enumerate() {
if KERNEL_PANIC_PATTERNS.iter().any(|p| line.contains(p)) {
let first_line = (*line).to_owned();
let stack: Vec<String> = lines
.iter()
.skip(idx + 1)
.take(PANIC_STACK_LINES)
.map(|s| (*s).to_owned())
.collect();
return Some((first_line, stack));
}
}
None
}
const PANIC_STACK_LINES: usize = 40;
fn encode_kernel_panic_err(first_line: &str, stack: &[String]) -> String {
let stack_joined = stack.join("\x1F");
format!("KERNEL_PANIC|{first_line}|{stack_joined}")
}
fn last_line_containing(path: &Path, needle: &str) -> Option<String> {
log_tail_get_text(path)
.lines()
.filter(|line| line.contains(needle))
.last()
.map(ToOwned::to_owned)
}
fn parse_snapshot_reason(line: &str) -> Option<String> {
let start = line.find("snapshot (")? + "snapshot (".len();
let end = line[start..].find("):")? + start;
Some(line[start..end].to_owned())
}
fn parse_capture_save_us(line: &str) -> (Option<u64>, Option<u64>) {
let capture = line
.split("capture ")
.nth(1)
.and_then(|s| s.split(" us").next())
.and_then(|s| s.trim().parse().ok());
let save = line
.split("save ")
.nth(1)
.and_then(|s| s.split(" us").next())
.and_then(|s| s.trim().parse().ok());
(capture, save)
}
fn parse_ram_mib(line: &str) -> (Option<u64>, Option<u64>) {
let Some(after_data) = line.split("data ").nth(1) else {
return (None, None);
};
let data = after_data
.split(" MiB")
.next()
.and_then(|s| s.trim().parse().ok());
let zero = line
.split("zero ")
.nth(1)
.and_then(|s| s.split(" MiB").next())
.and_then(|s| s.trim().parse().ok());
(data, zero)
}
fn file_physical_bytes(path: &Path) -> Option<u64> {
let mut cmd = Command::new("du");
cmd.arg("-sk").arg(path);
command_output(cmd, "du -sk")
.ok()
.and_then(|out| {
out.split_whitespace()
.next()
.and_then(|s| s.parse::<u64>().ok())
})
.map(|kib| kib * 1024)
}
fn now_utc_iso() -> Result<String, String> {
let mut cmd = Command::new("date");
cmd.arg("-u").arg("+%Y-%m-%dT%H:%M:%SZ");
Ok(command_output(cmd, "date utc")?.trim().to_owned())
}
fn native_listener_settle_ms(plan: &BakePlan<'_>) -> u64 {
arg_value(plan.extra_args, "--supermachine-listener-settle-ms")
.and_then(|s| s.parse::<u64>().ok())
.or_else(|| {
std::env::var("SUPERMACHINE_LISTENER_SETTLE_MS")
.ok()
.and_then(|s| s.parse::<u64>().ok())
})
.unwrap_or(50)
}
const MIN_FREE_MIB_AFTER_BALLOON: u64 = 96;
const PAGES_PER_MIB: u64 = 256;
pub(crate) fn compute_balloon_target_pages(memory_mib: u32) -> u64 {
let m = memory_mib as u64;
let cap_70pct = m * PAGES_PER_MIB * 70 / 100;
let floor_keep = MIN_FREE_MIB_AFTER_BALLOON * PAGES_PER_MIB;
let max_reclaim = m
.saturating_sub(MIN_FREE_MIB_AFTER_BALLOON)
.saturating_mul(PAGES_PER_MIB);
let _ = floor_keep;
cap_70pct.min(max_reclaim)
}
#[cfg_attr(
not(all(target_os = "linux", target_arch = "x86_64")),
allow(dead_code)
)]
pub(crate) fn build_kvm_rootfs_squashfs(
layer_blobs: &[PathBuf],
work_root: &Path,
out_squashfs: &Path,
workload_script: Option<&str>,
) -> Result<(), String> {
std::fs::create_dir_all(work_root)
.map_err(|e| format!("create kvm rootfs dir {}: {e}", work_root.display()))?;
let t_extract = std::time::Instant::now();
for blob in layer_blobs {
extract_layer_tar(blob, work_root)?;
}
remove_oci_whiteouts(work_root)?;
let extract_ms = t_extract.elapsed().as_millis();
let t_squash = std::time::Instant::now();
if let Some(script) = workload_script {
let sm_dir = work_root.join(".supermachine");
std::fs::create_dir_all(&sm_dir)
.map_err(|e| format!("create {} : {e}", sm_dir.display()))?;
let run = sm_dir.join("run-workload");
std::fs::write(&run, script.as_bytes())
.map_err(|e| format!("write {}: {e}", run.display()))?;
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt as _;
std::fs::set_permissions(&run, std::fs::Permissions::from_mode(0o755))
.map_err(|e| format!("chmod {}: {e}", run.display()))?;
}
}
let r = squashfs::write_squashfs(work_root, out_squashfs, &squashfs::Ownership::AllRoot);
if trace_enabled() {
let sz = std::fs::metadata(out_squashfs)
.map(|m| m.len())
.unwrap_or(0);
eprintln!(
"supermachine: kvm rootfs build extract_ms={} squashfs_ms={} layers={} squashfs_bytes={}",
extract_ms,
t_squash.elapsed().as_millis(),
layer_blobs.len(),
sz,
);
}
r
}
#[cfg_attr(
not(all(target_os = "linux", target_arch = "x86_64")),
allow(dead_code)
)]
pub(crate) fn pull_oci_layers(
image: &str,
arch: &str,
work_dir: &Path,
) -> Result<(Vec<PathBuf>, Option<String>), String> {
std::fs::create_dir_all(work_dir)
.map_err(|e| format!("create oci pull dir {}: {e}", work_dir.display()))?;
let source = select_image_source(image, arch)?;
source.pull(image, false)?;
let layout = source.save(image, work_dir)?;
let workload = kvm_workload_script(image, &layout, arch)?;
let shas = layer_shas_from_save_dir(image, &layout, arch)?;
let blobs = shas
.iter()
.map(|sha| {
let cached = registry_blob_cache_path(sha)?;
if cached.is_file() {
return Ok(cached);
}
let in_layout = blob_path(&layout, sha)?;
if in_layout.is_file() {
return Ok(in_layout);
}
Err(format!(
"layer blob {sha} not found in cache ({}) or layout ({})",
cached.display(),
in_layout.display()
))
})
.collect::<Result<Vec<PathBuf>, String>>()?;
Ok((blobs, workload))
}
pub(crate) enum CpioEntry<'a> {
File(&'a str, u32, &'a [u8]),
Dir(&'a str, u32),
}
#[cfg_attr(
not(all(target_os = "linux", target_arch = "x86_64")),
allow(dead_code)
)]
pub(crate) fn build_newc_cpio(entries: &[CpioEntry<'_>]) -> Vec<u8> {
const S_IFREG: u32 = 0o100000;
const S_IFDIR: u32 = 0o040000;
let mut out = Vec::new();
let mut ino: u32 = 1;
fn field(out: &mut Vec<u8>, v: u32) {
out.extend_from_slice(format!("{v:08x}").as_bytes());
}
fn pad4(out: &mut Vec<u8>) {
while out.len() % 4 != 0 {
out.push(0);
}
}
let mut emit = |out: &mut Vec<u8>, name: &str, mode: u32, nlink: u32, data: &[u8]| {
let namesize = name.len() as u32 + 1; out.extend_from_slice(b"070701");
field(out, ino); field(out, mode); field(out, 0); field(out, 0); field(out, nlink); field(out, 0); field(out, data.len() as u32); field(out, 0); field(out, 0); field(out, 0); field(out, 0); field(out, namesize); field(out, 0); out.extend_from_slice(name.as_bytes());
out.push(0);
pad4(out); out.extend_from_slice(data);
pad4(out); ino += 1;
};
for e in entries {
match e {
CpioEntry::File(path, perms, data) => emit(&mut out, path, S_IFREG | perms, 1, data),
CpioEntry::Dir(path, perms) => emit(&mut out, path, S_IFDIR | perms, 2, &[]),
}
}
emit(&mut out, "TRAILER!!!", 0, 1, &[]);
out
}
#[cfg_attr(
not(all(target_os = "linux", target_arch = "x86_64")),
allow(dead_code)
)]
pub(crate) fn build_kvm_agent_initramfs(
agent_bin: &Path,
busybox_bin: &Path,
ordered_modules: &[PathBuf],
out_cpio: &Path,
) -> Result<(), String> {
use std::io::Read as _;
let agent =
std::fs::read(agent_bin).map_err(|e| format!("read agent {}: {e}", agent_bin.display()))?;
let busybox = std::fs::read(busybox_bin)
.map_err(|e| format!("read busybox {}: {e}", busybox_bin.display()))?;
let mut modules: Vec<(String, Vec<u8>)> = Vec::new();
for m in ordered_modules {
let raw = std::fs::read(m).map_err(|e| format!("read module {}: {e}", m.display()))?;
let fname = m
.file_name()
.and_then(|s| s.to_str())
.ok_or_else(|| format!("bad module name {}", m.display()))?;
let (name, bytes) = match fname.strip_suffix(".zst") {
Some(stripped) => {
let mut dec = ruzstd::StreamingDecoder::new(std::io::Cursor::new(&raw))
.map_err(|e| format!("zstd init {}: {e}", m.display()))?;
let mut out = Vec::new();
dec.read_to_end(&mut out)
.map_err(|e| format!("zstd decode {}: {e}", m.display()))?;
(stripped.to_string(), out)
}
None => (fname.to_string(), raw),
};
modules.push((name, bytes));
}
let mut init = String::from("#!/bin/busybox sh\n");
init.push_str("BB=/bin/busybox\n");
init.push_str("$BB mount -t proc proc /proc 2>/dev/null\n");
init.push_str("$BB mount -t devtmpfs dev /dev 2>/dev/null\n");
init.push_str("$BB mount -t sysfs sys /sys 2>/dev/null\n");
init.push_str(
"for tok in $($BB cat /proc/cmdline); do case \"$tok\" in supermachine.host_time=*) \
ht=\"${tok#supermachine.host_time=}\"; \
$BB date -s \"@$ht\" >/dev/null 2>&1 || $BB date -u -D %s -s \"$ht\" >/dev/null 2>&1 || true;; \
esac; done\n",
);
for (name, _) in &modules {
init.push_str(&format!("$BB insmod /modules/{name}\n"));
}
init.push_str("$BB mkdir -p /lower /ov /newroot\n");
init.push_str("$BB mount -t squashfs -o ro /dev/vda /lower\n");
init.push_str("$BB mount -t tmpfs tmpfs /ov\n");
init.push_str("$BB mkdir -p /ov/upper /ov/work\n");
init.push_str(
"$BB mount -t overlay overlay \
-o lowerdir=/lower,upperdir=/ov/upper,workdir=/ov/work /newroot\n",
);
init.push_str(
"[ -s /newroot/etc/resolv.conf ] || { $BB mkdir -p /newroot/etc; \
printf 'nameserver 1.1.1.1\\nnameserver 8.8.8.8\\n' > /newroot/etc/resolv.conf; }\n",
);
init.push_str(
"for tok in $($BB cat /proc/cmdline); do case \"$tok\" in sm.volume=*) \
spec=\"${tok#sm.volume=}\"; dev=\"${spec%%:*}\"; mnt=\"${spec#*:}\"; \
$BB mkdir -p \"/newroot$mnt\"; \
$BB mount -t ext4 \"$dev\" \"/newroot$mnt\" || $BB mount \"$dev\" \"/newroot$mnt\";; \
esac; done\n",
);
init.push_str(
"for tok in $($BB cat /proc/cmdline); do case \"$tok\" in sm.virtiofs=*) \
spec=\"${tok#sm.virtiofs=}\"; tag=\"${spec%%:*}\"; mnt=\"${spec#*:}\"; \
$BB mkdir -p \"/newroot$mnt\"; \
$BB mount -t virtiofs -o dax \"$tag\" \"/newroot$mnt\" || \
$BB mount -t virtiofs \"$tag\" \"/newroot$mnt\";; \
esac; done\n",
);
init.push_str("$BB mkdir -p /newroot/.supermachine /newroot/proc /newroot/dev /newroot/sys\n");
init.push_str("$BB cp /supermachine-agent /newroot/.supermachine/agent\n");
init.push_str("$BB cp /bin/busybox /newroot/.supermachine/busybox\n");
init.push_str("$BB chmod 755 /newroot/.supermachine/busybox\n");
init.push_str(
"$BB cat > /newroot/.supermachine/launch <<'SMLAUNCH'\n\
#!/.supermachine/busybox sh\n\
B=/.supermachine/busybox\n\
$B mkdir -p /run\n\
if [ -f /.supermachine/run-workload ]; then\n\
eval \"$($B grep -E '^export ' /.supermachine/run-workload)\" 2>/dev/null || true\n\
$B sh /.supermachine/run-workload &\n\
echo $! > /run/supermachine-workload.pid\n\
fi\n\
exec /.supermachine/agent\n\
SMLAUNCH\n",
);
init.push_str("$BB chmod 755 /newroot/.supermachine/launch\n");
init.push_str("$BB mount --move /proc /newroot/proc\n");
init.push_str("$BB mount --move /dev /newroot/dev\n");
init.push_str("$BB mount --move /sys /newroot/sys\n");
init.push_str("exec $BB switch_root /newroot /.supermachine/launch\n");
let init_bytes = init.into_bytes();
let module_paths: Vec<String> = modules
.iter()
.map(|(n, _)| format!("modules/{n}"))
.collect();
let mut entries = vec![
CpioEntry::Dir("bin", 0o755),
CpioEntry::Dir("modules", 0o755),
CpioEntry::Dir("proc", 0o755),
CpioEntry::Dir("dev", 0o755),
CpioEntry::Dir("sys", 0o755),
CpioEntry::File("init", 0o755, &init_bytes),
CpioEntry::File("bin/busybox", 0o755, &busybox),
CpioEntry::File("supermachine-agent", 0o755, &agent),
];
for (i, (_, bytes)) in modules.iter().enumerate() {
entries.push(CpioEntry::File(&module_paths[i], 0o644, bytes));
}
let cpio = build_newc_cpio(&entries);
std::fs::write(out_cpio, &cpio)
.map_err(|e| format!("write initramfs {}: {e}", out_cpio.display()))
}
#[cfg(test)]
mod kvm_rootfs_tests {
use super::build_kvm_rootfs_squashfs;
use std::path::PathBuf;
fn tmp(tag: &str) -> PathBuf {
use std::sync::atomic::{AtomicU64, Ordering};
static N: AtomicU64 = AtomicU64::new(0);
let n = N.fetch_add(1, Ordering::Relaxed);
std::env::temp_dir().join(format!("sm-kvmroot-{tag}-{}-{n}", std::process::id()))
}
fn make_tar(path: &std::path::Path, entries: &[(&str, &[u8])]) {
let f = std::fs::File::create(path).unwrap();
let mut b = tar::Builder::new(f);
for (name, data) in entries {
let mut h = tar::Header::new_gnu();
h.set_path(name).unwrap();
h.set_size(data.len() as u64);
h.set_mode(0o644);
h.set_cksum();
b.append(&h, *data).unwrap();
}
b.finish().unwrap();
}
#[test]
fn merges_layers_applies_whiteouts_and_writes_squashfs() {
let l1 = tmp("l1.tar");
let l2 = tmp("l2.tar");
make_tar(&l1, &[("a.txt", b"from-layer-1"), ("keep.txt", b"keep")]);
make_tar(&l2, &[(".wh.a.txt", b""), ("b.txt", b"from-layer-2")]);
let work = tmp("work");
let out = tmp("rootfs.squashfs");
build_kvm_rootfs_squashfs(&[l1.clone(), l2.clone()], &work, &out, None)
.expect("build rootfs");
assert!(work.join("b.txt").is_file(), "upper layer file present");
assert!(work.join("keep.txt").is_file(), "lower-only file kept");
assert!(!work.join("a.txt").exists(), "whited-out file removed");
assert!(!work.join(".wh.a.txt").exists(), "whiteout marker removed");
let bytes = std::fs::read(&out).expect("read squashfs");
assert!(bytes.len() > 100, "squashfs non-empty");
assert_eq!(&bytes[..4], b"hsqs", "squashfs magic");
let _ = std::fs::remove_file(&l1);
let _ = std::fs::remove_file(&l2);
let _ = std::fs::remove_dir_all(&work);
let _ = std::fs::remove_file(&out);
}
#[test]
fn writes_squashfs_for_single_layer() {
let l1 = tmp("s1.tar");
make_tar(&l1, &[("etc/os-release", b"NAME=test\n")]);
let work = tmp("swork");
let out = tmp("s.squashfs");
build_kvm_rootfs_squashfs(&[l1.clone()], &work, &out, None).expect("build");
assert!(work.join("etc/os-release").is_file());
assert_eq!(&std::fs::read(&out).unwrap()[..4], b"hsqs");
let _ = std::fs::remove_file(&l1);
let _ = std::fs::remove_dir_all(&work);
let _ = std::fs::remove_file(&out);
}
#[test]
fn workload_script_written_into_rootfs() {
let l1 = tmp("w1.tar");
make_tar(&l1, &[("etc/os-release", b"NAME=test\n")]);
let work = tmp("wwork");
let out = tmp("w.squashfs");
let script = "#!/.supermachine/busybox sh\nexec 'nginx' '-g' 'daemon off;'\n";
build_kvm_rootfs_squashfs(&[l1.clone()], &work, &out, Some(script)).expect("build");
let run = work.join(".supermachine/run-workload");
assert!(run.is_file(), "run-workload staged");
assert_eq!(std::fs::read_to_string(&run).unwrap(), script);
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt as _;
let mode = std::fs::metadata(&run).unwrap().permissions().mode();
assert_eq!(mode & 0o777, 0o755, "run-workload is executable");
}
let _ = std::fs::remove_file(&l1);
let _ = std::fs::remove_dir_all(&work);
let _ = std::fs::remove_file(&out);
}
#[test]
fn sh_single_quote_escapes_metachars() {
use super::sh_single_quote;
assert_eq!(sh_single_quote("nginx"), "'nginx'");
assert_eq!(sh_single_quote("daemon off;"), "'daemon off;'");
assert_eq!(sh_single_quote("a'b"), "'a'\\''b'");
assert_eq!(sh_single_quote("$(rm -rf /)"), "'$(rm -rf /)'");
assert_eq!(sh_single_quote("`id`"), "'`id`'");
}
use super::{build_newc_cpio, CpioEntry};
fn parse_newc(buf: &[u8]) -> Vec<(String, u32, Vec<u8>)> {
let hex = |b: &[u8]| u32::from_str_radix(std::str::from_utf8(b).unwrap(), 16).unwrap();
let mut out = Vec::new();
let mut p = 0usize;
loop {
assert_eq!(&buf[p..p + 6], b"070701", "newc magic at {p}");
let mode = hex(&buf[p + 14..p + 22]);
let filesize = hex(&buf[p + 54..p + 62]) as usize;
let namesize = hex(&buf[p + 94..p + 102]) as usize;
let name_start = p + 110;
let name =
String::from_utf8(buf[name_start..name_start + namesize - 1].to_vec()).unwrap();
let after_name = (name_start + namesize + 3) & !3;
let data = buf[after_name..after_name + filesize].to_vec();
let after_data = (after_name + filesize + 3) & !3;
p = after_data;
if name == "TRAILER!!!" {
break;
}
out.push((name, mode, data));
}
out
}
#[test]
fn newc_cpio_roundtrips() {
let cpio = build_newc_cpio(&[
CpioEntry::Dir("mnt", 0o755),
CpioEntry::File("init", 0o755, b"#!/bin/sh\nexec /agent\n"),
CpioEntry::File("modules/vsock.ko", 0o644, b"\x7fELF-fake-module"),
]);
assert_eq!(cpio.len() % 4, 0);
assert_eq!(&cpio[..6], b"070701");
let recs = parse_newc(&cpio);
assert_eq!(recs.len(), 3, "3 entries before trailer");
assert_eq!(recs[0].0, "mnt");
assert_eq!(recs[0].1 & 0o170000, 0o040000, "dir bit");
assert_eq!(recs[1].0, "init");
assert_eq!(recs[1].1 & 0o170000, 0o100000, "regular file bit");
assert_eq!(recs[1].1 & 0o777, 0o755, "exec perms");
assert_eq!(recs[1].2, b"#!/bin/sh\nexec /agent\n");
assert_eq!(recs[2].0, "modules/vsock.ko");
assert_eq!(recs[2].2, b"\x7fELF-fake-module");
}
}
#[cfg(test)]
mod balloon_target_tests {
use super::compute_balloon_target_pages;
#[test]
fn small_guest_caps_at_safety_floor() {
assert_eq!(compute_balloon_target_pages(128), 8192);
}
#[test]
fn medium_guest_uses_70_percent() {
assert_eq!(compute_balloon_target_pages(256), 40960);
}
#[test]
fn large_guest_uses_70_percent() {
assert_eq!(compute_balloon_target_pages(1024), 183500);
}
#[test]
fn tiny_guest_no_ballooning() {
assert_eq!(compute_balloon_target_pages(64), 0);
}
}
#[cfg(test)]
mod kernel_panic_detect_tests {
use super::{detect_kernel_panic, encode_kernel_panic_err, PANIC_STACK_LINES};
use std::io::Write;
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering};
static TEST_COUNTER: AtomicU64 = AtomicU64::new(0);
struct TempLog {
path: PathBuf,
}
impl TempLog {
fn new(contents: &str) -> Self {
let id = TEST_COUNTER.fetch_add(1, Ordering::Relaxed);
let path = std::env::temp_dir().join(format!(
"supermachine-panic-test-{}-{id}.log",
std::process::id()
));
let mut f = std::fs::File::create(&path).expect("create tmp log");
f.write_all(contents.as_bytes()).expect("write tmp log");
Self { path }
}
fn path(&self) -> &std::path::Path {
&self.path
}
}
impl Drop for TempLog {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.path);
}
}
#[test]
fn detects_kernel_panic_banner() {
let log = TempLog::new(
"[ 0.001234] Booting Linux\n\
[ 0.123456] something normal\n\
[ 1.234567] Kernel panic - not syncing: foo\n\
[ 1.234568] CPU: 0 PID: 1\n\
[ 1.234569] Call trace:\n\
[ 1.234570] panic+0x12c/0x34c\n",
);
let (first, stack) = detect_kernel_panic(log.path()).expect("panic detected");
assert!(first.contains("Kernel panic"), "first_line: {first}");
assert!(stack.iter().any(|l| l.contains("Call trace")));
}
#[test]
fn detects_oops() {
let log = TempLog::new(
"boot fine\n\
Internal error: Oops: 0000000096000004 [#1] PREEMPT SMP\n\
Modules linked in:\n\
pstate: 60400005\n",
);
let (first, stack) = detect_kernel_panic(log.path()).expect("oops detected");
assert!(first.contains("Internal error: Oops"));
assert_eq!(stack.len(), 2);
}
#[test]
fn detects_unable_to_handle_null() {
let log = TempLog::new(
"[ok] running\n\
[ok] still going\n\
Unable to handle kernel NULL pointer dereference at virtual address 0\n",
);
let (first, _) = detect_kernel_panic(log.path()).expect("null deref detected");
assert!(first.contains("NULL pointer dereference"));
}
#[test]
fn detects_kill_idle_task() {
let log = TempLog::new("Attempted to kill the idle task!\nsystem hosed\n");
let (first, _) = detect_kernel_panic(log.path()).expect("idle-task detected");
assert!(first.contains("Attempted to kill the idle task"));
}
#[test]
fn detects_paging_request() {
let log = TempLog::new(
"ok\nUnable to handle kernel paging request at virtual address ffff000000000000\n",
);
let (first, _) = detect_kernel_panic(log.path()).expect("paging detected");
assert!(first.contains("paging request"));
}
#[test]
fn no_match_on_clean_log() {
let log = TempLog::new(
"[ 0.000000] Booting Linux on physical CPU 0x0000000000\n\
[ 0.123456] init-oci: stage=workload-pre-exec\n\
[ 0.987654] supermachine.worker: BAKE_READY\n",
);
assert!(detect_kernel_panic(log.path()).is_none());
}
#[test]
fn stack_capture_is_bounded() {
let mut text = String::from("Kernel panic - test\n");
for i in 0..(PANIC_STACK_LINES + 50) {
text.push_str(&format!("frame{i}\n"));
}
let log = TempLog::new(&text);
let (_, stack) = detect_kernel_panic(log.path()).unwrap();
assert_eq!(stack.len(), PANIC_STACK_LINES);
}
#[test]
fn encode_decode_round_trips() {
let first = "Kernel panic - test".to_owned();
let stack = vec!["frame_a".to_owned(), "frame_b".to_owned()];
let encoded = encode_kernel_panic_err(&first, &stack);
assert!(encoded.starts_with("KERNEL_PANIC|"));
let rest = encoded.strip_prefix("KERNEL_PANIC|").unwrap();
let mut parts = rest.splitn(2, '|');
assert_eq!(parts.next().unwrap(), "Kernel panic - test");
let stack_str = parts.next().unwrap();
let decoded: Vec<&str> = stack_str.split('\x1F').collect();
assert_eq!(decoded, vec!["frame_a", "frame_b"]);
}
#[test]
fn snapshot_after_ms_default_no_flag() {
let extra: Vec<String> = Vec::new();
let prev = std::env::var("SUPERMACHINE_SNAPSHOT_AFTER_MS").ok();
unsafe {
std::env::remove_var("SUPERMACHINE_SNAPSHOT_AFTER_MS");
}
let plan = super::BakePlan {
image: "x",
name: None,
runtime: "supermachine",
guest_port: 80,
memory_mib: 256,
vcpus: 1,
pull_policy: "missing",
snapshots_dir: std::path::Path::new("/tmp"),
cmd_override: None,
extra_args: &extra,
platform: "linux/arm64",
};
let v = super::native_snapshot_after_ms(&plan);
if let Some(p) = prev {
unsafe {
std::env::set_var("SUPERMACHINE_SNAPSHOT_AFTER_MS", p);
}
}
assert_eq!(v, 200, "no listener-required ⇒ short fallback");
}
#[test]
fn snapshot_after_ms_listener_required_keeps_7s() {
let extra: Vec<String> = vec!["--supermachine-listener-required".to_owned()];
let prev = std::env::var("SUPERMACHINE_SNAPSHOT_AFTER_MS").ok();
unsafe {
std::env::remove_var("SUPERMACHINE_SNAPSHOT_AFTER_MS");
}
let plan = super::BakePlan {
image: "x",
name: None,
runtime: "supermachine",
guest_port: 80,
memory_mib: 256,
vcpus: 1,
pull_policy: "missing",
snapshots_dir: std::path::Path::new("/tmp"),
cmd_override: None,
extra_args: &extra,
platform: "linux/arm64",
};
let v = super::native_snapshot_after_ms(&plan);
if let Some(p) = prev {
unsafe {
std::env::set_var("SUPERMACHINE_SNAPSHOT_AFTER_MS", p);
}
}
assert_eq!(v, 7000, "listener-required ⇒ keep 7s safety net");
}
}
#[cfg(test)]
mod mount_arg_parsing_tests {
use super::parse_mount_arg;
#[test]
fn three_field_mount_no_policy() {
let v = parse_mount_arg("/host/x:workspace:/workspace").unwrap();
assert_eq!(v["host_path"], "/host/x");
assert_eq!(v["guest_tag"], "workspace");
assert_eq!(v["guest_path"], "/workspace");
assert!(v.get("symlinks").is_none());
}
#[test]
fn four_field_mount_with_policy() {
let v = parse_mount_arg("/h:t:/g:deny").unwrap();
assert_eq!(v["host_path"], "/h");
assert_eq!(v["guest_tag"], "t");
assert_eq!(v["guest_path"], "/g");
assert_eq!(v["symlinks"], "deny");
}
#[test]
fn each_policy_round_trips() {
for policy in ["opaque", "deny", "follow"] {
let raw = format!("/h:t:/g:{policy}");
let v = parse_mount_arg(&raw).unwrap();
assert_eq!(v["symlinks"], policy);
}
}
#[test]
fn two_field_legacy_form_rejected() {
assert!(parse_mount_arg("/h:tag").is_none());
assert!(
parse_mount_arg("/h:tag:").is_none(),
"empty guest_path is also invalid"
);
}
#[test]
fn empty_tag_rejected() {
assert!(parse_mount_arg("/h::/g").is_none());
}
#[test]
fn empty_guest_path_rejected() {
assert!(parse_mount_arg("/h:tag:").is_none());
}
#[test]
fn one_field_rejected() {
assert!(parse_mount_arg("/just-host").is_none());
}
#[test]
fn five_or_more_fields_treated_as_path_with_colons() {
let v = parse_mount_arg("/h:t:/g:weirdpolicy:trailing");
assert!(
v.is_some(),
"splitn(4) keeps the trailing token in the 4th field"
);
}
#[test]
fn policy_field_can_be_empty_treated_as_omitted() {
let v = parse_mount_arg("/h:t:/g:").unwrap();
assert_eq!(v["symlinks"], "");
}
#[test]
fn guest_path_with_subdirs_preserved() {
let v = parse_mount_arg("/h:t:/a/b/c/d").unwrap();
assert_eq!(v["guest_path"], "/a/b/c/d");
}
}
#[cfg(test)]
mod manifest_pin_verification_tests {
use super::{
parse_content_digest_header, sha256_bytes, verify_advertised_content_digest,
verify_pinned_manifest_digest,
};
#[test]
fn tag_reference_is_not_verified() {
assert!(verify_pinned_manifest_digest("latest", b"arbitrary manifest bytes").is_ok());
assert!(verify_pinned_manifest_digest("v1.2.3", b"\x00\xff\x10").is_ok());
}
#[test]
fn matching_pinned_digest_is_accepted() {
let body = br#"{"schemaVersion":2,"layers":[]}"#;
let hex = sha256_bytes(body).unwrap();
let reference = format!("sha256:{hex}");
assert!(
verify_pinned_manifest_digest(&reference, body).is_ok(),
"honest registry serving the pinned manifest must pass",
);
}
#[test]
fn mismatched_pinned_digest_is_rejected() {
let pinned_body = br#"{"schemaVersion":2,"config":{"digest":"sha256:aaaa"}}"#;
let hex = sha256_bytes(pinned_body).unwrap();
let reference = format!("sha256:{hex}");
let tampered_body = br#"{"schemaVersion":2,"config":{"digest":"sha256:bbbb"}}"#;
let err = verify_pinned_manifest_digest(&reference, tampered_body)
.expect_err("tampered manifest must be rejected");
assert!(err.contains("digest mismatch"), "unexpected error: {err}");
assert!(err.contains("refusing to bake"), "unexpected error: {err}");
}
#[test]
fn flipping_one_byte_is_detected() {
let mut body = b"the original manifest document".to_vec();
let hex = sha256_bytes(&body).unwrap();
let reference = format!("sha256:{hex}");
assert!(verify_pinned_manifest_digest(&reference, &body).is_ok());
body[0] ^= 1;
assert!(
verify_pinned_manifest_digest(&reference, &body).is_err(),
"a one-byte change slipped past the digest pin",
);
}
#[test]
fn non_sha256_algorithm_is_left_unverified() {
assert!(verify_pinned_manifest_digest("sha512:deadbeef", b"anything").is_ok());
}
#[test]
fn content_digest_header_is_parsed_case_insensitively() {
let headers = "HTTP/1.1 200 OK\r\n\
Content-Type: application/vnd.oci.image.manifest.v1+json\r\n\
docker-content-digest: sha256:abc123\r\n\
Content-Length: 42\r\n";
assert_eq!(
parse_content_digest_header(headers).as_deref(),
Some("sha256:abc123"),
);
}
#[test]
fn absent_content_digest_header_is_a_noop() {
let headers = "HTTP/1.1 200 OK\r\nContent-Length: 3\r\n";
assert!(parse_content_digest_header(headers).is_none());
assert!(verify_advertised_content_digest(headers, b"anything").is_ok());
}
#[test]
fn advertised_digest_matching_body_is_accepted() {
let body = br#"{"schemaVersion":2}"#;
let hex = sha256_bytes(body).unwrap();
let headers = format!("HTTP/1.1 200 OK\r\nDocker-Content-Digest: sha256:{hex}\r\n");
assert!(verify_advertised_content_digest(&headers, body).is_ok());
}
#[test]
fn advertised_digest_not_matching_body_is_rejected() {
let real = br#"{"schemaVersion":2}"#;
let other = br#"{"schemaVersion":2,"evil":true}"#;
let other_hex = sha256_bytes(other).unwrap();
let headers = format!("HTTP/1.1 200 OK\r\nDocker-Content-Digest: sha256:{other_hex}\r\n");
let err = verify_advertised_content_digest(&headers, real)
.expect_err("mislabeled content must be rejected");
assert!(
err.contains("mislabeled content"),
"unexpected error: {err}"
);
}
#[test]
fn advertised_non_sha256_digest_is_left_unverified() {
let headers = "HTTP/1.1 200 OK\r\nDocker-Content-Digest: sha512:deadbeef\r\n";
assert!(verify_advertised_content_digest(headers, b"anything").is_ok());
}
}
#[cfg(test)]
mod digest_path_guard_tests {
use super::{confined_join, sha256_path_component};
use std::path::Path;
#[test]
fn sha256_component_accepts_valid_digest() {
let hex = "a".repeat(64);
assert_eq!(
sha256_path_component(&format!("sha256:{hex}")).unwrap(),
hex
);
assert_eq!(sha256_path_component(&hex).unwrap(), hex);
}
#[test]
fn sha256_component_rejects_traversal() {
for bad in [
"sha256:../../etc/passwd",
"sha256:..",
"sha256:a/b",
"sha256:../x",
"../../escape",
"sha256:",
"",
] {
assert!(
sha256_path_component(bad).is_err(),
"{bad:?} must be rejected as unsafe"
);
}
}
#[test]
fn confined_join_allows_in_tree_paths() {
let root = Path::new("/base");
assert_eq!(
confined_join(root, "blobs/sha256/abc").unwrap(),
Path::new("/base/blobs/sha256/abc")
);
assert_eq!(
confined_join(root, "config.json").unwrap(),
Path::new("/base/config.json")
);
}
#[test]
fn confined_join_rejects_escapes() {
let root = Path::new("/base");
for bad in [
"../../etc/passwd",
"/etc/passwd",
"a/../../b",
"blobs/../../escape",
] {
assert!(
confined_join(root, bad).is_err(),
"{bad:?} must be rejected as a traversal"
);
}
}
}
#[cfg(test)]
mod tar_extract_tests {
use super::extract_layer_tar;
use std::path::PathBuf;
fn ustar(name: &str, typeflag: u8, linkname: &str, data: &[u8]) -> Vec<u8> {
let mut h = [0u8; 512];
let nb = name.as_bytes();
h[0..nb.len().min(100)].copy_from_slice(&nb[..nb.len().min(100)]);
h[100..108].copy_from_slice(b"0000644\0");
h[108..116].copy_from_slice(b"0000000\0");
h[116..124].copy_from_slice(b"0000000\0");
h[124..136].copy_from_slice(format!("{:011o}\0", data.len()).as_bytes());
h[136..148].copy_from_slice(b"00000000000\0");
h[156] = typeflag;
let lb = linkname.as_bytes();
h[157..157 + lb.len().min(100)].copy_from_slice(&lb[..lb.len().min(100)]);
h[257..263].copy_from_slice(b"ustar\0");
h[263..265].copy_from_slice(b"00");
for b in &mut h[148..156] {
*b = b' ';
}
let sum: u32 = h.iter().map(|&b| b as u32).sum();
h[148..156].copy_from_slice(format!("{sum:06o}\0 ").as_bytes());
let mut out = h.to_vec();
out.extend_from_slice(data);
let pad = (512 - data.len() % 512) % 512;
out.extend(std::iter::repeat(0u8).take(pad));
out
}
fn tmp(tag: &str) -> PathBuf {
let n = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos();
std::env::temp_dir().join(format!("sm-tarslip-{tag}-{}-{n}", std::process::id()))
}
#[test]
fn tar_extract_confinement() {
const FILE: u8 = b'0';
const SYMLINK: u8 = b'2';
let base = tmp("base");
let dest = base.join("layer_extract");
std::fs::create_dir_all(&dest).unwrap();
let mut blob = Vec::new();
blob.extend(ustar("../escape_dotdot", FILE, "", b"PWNED")); blob.extend(ustar("good", FILE, "", b"ok")); blob.extend(ustar("evil_link", SYMLINK, "..", b"")); blob.extend(ustar("evil_link/through", FILE, "", b"PWNED2")); blob.extend(ustar("/abs_escape", FILE, "", b"PWNED3")); blob.extend(std::iter::repeat(0u8).take(1024)); let blob_path = base.join("evil.tar");
std::fs::write(&blob_path, &blob).unwrap();
extract_layer_tar(&blob_path, &dest).expect("extract returns Ok");
assert_eq!(
std::fs::read_to_string(dest.join("good")).unwrap_or_default(),
"ok",
"legitimate entry must extract inside dest"
);
assert!(
!base.join("escape_dotdot").exists(),
"`../` traversal escaped the extraction dir"
);
assert!(
!base.join("through").exists() && !base.join("PWNED2").exists(),
"symlink-escape wrote outside the extraction dir"
);
let stray: Vec<_> = std::fs::read_dir(&base)
.unwrap()
.flatten()
.map(|e| e.file_name().to_string_lossy().into_owned())
.filter(|n| n != "layer_extract" && n != "evil.tar")
.collect();
assert!(
stray.is_empty(),
"unexpected escaped entries in base: {stray:?}"
);
let _ = std::fs::remove_dir_all(&base);
}
#[test]
fn decompression_bomb_rejected() {
let dir = tmp("bomb");
std::fs::create_dir_all(&dir).unwrap();
let enc = flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::best());
let mut ar = tar::Builder::new(enc);
let big = vec![0u8; 1 << 20];
let mut h = tar::Header::new_gnu();
h.set_size(big.len() as u64);
h.set_mode(0o644);
h.set_cksum();
ar.append_data(&mut h, "big", &big[..]).unwrap();
let gz = ar.into_inner().unwrap().finish().unwrap();
let blob = dir.join("bomb.tar.gz");
std::fs::write(&blob, &gz).unwrap();
let dest = dir.join("out");
std::fs::create_dir_all(&dest).unwrap();
std::env::set_var("SUPERMACHINE_MAX_LAYER_BYTES", "65536"); let r = extract_layer_tar(&blob, &dest);
std::env::remove_var("SUPERMACHINE_MAX_LAYER_BYTES");
assert!(
r.is_err() && format!("{r:?}").contains("cap"),
"decompression bomb should be rejected, got {r:?}"
);
let _ = std::fs::remove_dir_all(&dir);
}
}
#[cfg(test)]
mod oci_save_dir_tests {
use serde_json::json;
fn write_blob(layout: &std::path::Path, bytes: &[u8]) -> String {
use std::io::Write;
let sha = super::sha256_bytes(bytes).expect("sha256");
let dir = layout.join("blobs/sha256");
std::fs::create_dir_all(&dir).unwrap();
let mut f = std::fs::File::create(dir.join(&sha)).unwrap();
f.write_all(bytes).unwrap();
sha
}
fn write_json(layout: &std::path::Path, v: &serde_json::Value) -> String {
let bytes = serde_json::to_vec(v).unwrap();
write_blob(layout, &bytes)
}
fn scratch_layout(name: &str) -> std::path::PathBuf {
let p = std::env::temp_dir().join(format!(
"supermachine-oci-save-test-{}-{name}",
std::process::id()
));
let _ = std::fs::remove_dir_all(&p);
std::fs::create_dir_all(&p).unwrap();
std::fs::create_dir_all(p.join("blobs/sha256")).unwrap();
p
}
fn write_index(layout: &std::path::Path, idx: &serde_json::Value) {
let bytes = serde_json::to_vec_pretty(idx).unwrap();
std::fs::write(layout.join("index.json"), bytes).unwrap();
}
#[test]
fn nested_index_resolves_amd64() {
let layout = scratch_layout("nested-amd64");
let manifest = json!({
"schemaVersion": 2,
"mediaType": "application/vnd.oci.image.manifest.v1+json",
"config": { "mediaType": "application/vnd.oci.image.config.v1+json",
"digest": "sha256:dead", "size": 0 },
"layers": [
{ "mediaType": "application/vnd.oci.image.layer.v1.tar+gzip",
"digest": "sha256:aaa", "size": 1 },
{ "mediaType": "application/vnd.oci.image.layer.v1.tar+gzip",
"digest": "sha256:bbb", "size": 2 }
]
});
let manifest_sha = write_json(&layout, &manifest);
let inner_index = json!({
"schemaVersion": 2,
"mediaType": "application/vnd.oci.image.index.v1+json",
"manifests": [
{ "mediaType": "application/vnd.oci.image.manifest.v1+json",
"digest": format!("sha256:{manifest_sha}"),
"size": 0,
"platform": { "architecture": "amd64", "os": "linux" } }
]
});
let inner_sha = write_json(&layout, &inner_index);
let outer = json!({
"schemaVersion": 2,
"manifests": [
{ "mediaType": "application/vnd.oci.image.index.v1+json",
"digest": format!("sha256:{inner_sha}"),
"size": 0 }
]
});
write_index(&layout, &outer);
let shas =
super::layer_shas_from_save_dir("test/amd64", &layout, "amd64").expect("resolve");
assert_eq!(shas, vec!["aaa".to_owned(), "bbb".to_owned()]);
}
#[test]
fn flat_index_resolves_arch() {
let layout = scratch_layout("flat-arm64");
let manifest = json!({
"schemaVersion": 2,
"mediaType": "application/vnd.oci.image.manifest.v1+json",
"config": { "mediaType": "application/vnd.oci.image.config.v1+json",
"digest": "sha256:dead", "size": 0 },
"layers": [
{ "mediaType": "application/vnd.oci.image.layer.v1.tar+gzip",
"digest": "sha256:ccc", "size": 1 }
]
});
let m_sha = write_json(&layout, &manifest);
let outer = json!({
"schemaVersion": 2,
"manifests": [
{ "mediaType": "application/vnd.oci.image.manifest.v1+json",
"digest": format!("sha256:{m_sha}"),
"size": 0,
"platform": { "architecture": "arm64", "os": "linux" } }
]
});
write_index(&layout, &outer);
let shas = super::layer_shas_from_save_dir("test/arm64", &layout, "arm64").expect("ok");
assert_eq!(shas, vec!["ccc".to_owned()]);
}
#[test]
fn multi_arch_picks_requested() {
let layout = scratch_layout("multi-arch");
let m_arm = write_json(
&layout,
&json!({
"schemaVersion": 2,
"mediaType": "application/vnd.oci.image.manifest.v1+json",
"config": { "digest": "sha256:c1", "size": 0,
"mediaType": "application/vnd.oci.image.config.v1+json" },
"layers": [ { "digest": "sha256:arm1", "size": 1,
"mediaType": "application/vnd.oci.image.layer.v1.tar+gzip" } ]
}),
);
let m_amd = write_json(
&layout,
&json!({
"schemaVersion": 2,
"mediaType": "application/vnd.oci.image.manifest.v1+json",
"config": { "digest": "sha256:c2", "size": 0,
"mediaType": "application/vnd.oci.image.config.v1+json" },
"layers": [ { "digest": "sha256:amd1", "size": 1,
"mediaType": "application/vnd.oci.image.layer.v1.tar+gzip" } ]
}),
);
let outer = json!({
"schemaVersion": 2,
"manifests": [
{ "mediaType": "application/vnd.oci.image.manifest.v1+json",
"digest": format!("sha256:{m_arm}"), "size": 0,
"platform": { "architecture": "arm64", "os": "linux" } },
{ "mediaType": "application/vnd.oci.image.manifest.v1+json",
"digest": format!("sha256:{m_amd}"), "size": 0,
"platform": { "architecture": "amd64", "os": "linux" } }
]
});
write_index(&layout, &outer);
assert_eq!(
super::layer_shas_from_save_dir("multi", &layout, "amd64").unwrap(),
vec!["amd1".to_owned()]
);
assert_eq!(
super::layer_shas_from_save_dir("multi", &layout, "arm64").unwrap(),
vec!["arm1".to_owned()]
);
}
#[test]
fn missing_arch_errors() {
let layout = scratch_layout("missing-arch");
let m = write_json(
&layout,
&json!({
"schemaVersion": 2,
"mediaType": "application/vnd.oci.image.manifest.v1+json",
"config": { "digest": "sha256:c", "size": 0,
"mediaType": "application/vnd.oci.image.config.v1+json" },
"layers": [ { "digest": "sha256:l", "size": 1,
"mediaType": "application/vnd.oci.image.layer.v1.tar+gzip" } ]
}),
);
let outer = json!({
"schemaVersion": 2,
"manifests": [
{ "mediaType": "application/vnd.oci.image.manifest.v1+json",
"digest": format!("sha256:{m}"), "size": 0,
"platform": { "architecture": "arm64", "os": "linux" } }
]
});
write_index(&layout, &outer);
let e = super::layer_shas_from_save_dir("only-arm", &layout, "amd64").unwrap_err();
assert!(
e.contains("no amd64 manifest")
|| e.contains("legacy manifest.json fallback also failed"),
"got: {e}"
);
}
#[test]
fn docker_save_missing_inner_manifest_falls_back_to_legacy() {
let layout = scratch_layout("docker-save-fallback");
let inner_index = json!({
"schemaVersion": 2,
"mediaType": "application/vnd.oci.image.index.v1+json",
"manifests": [
{ "mediaType": "application/vnd.oci.image.manifest.v1+json",
"digest": "sha256:dead0000missingblob",
"size": 2319,
"platform": { "architecture": "amd64", "os": "linux" } }
]
});
let inner_sha = write_json(&layout, &inner_index);
let outer = json!({
"schemaVersion": 2,
"manifests": [
{ "mediaType": "application/vnd.oci.image.index.v1+json",
"digest": format!("sha256:{inner_sha}"),
"size": 0 }
]
});
write_index(&layout, &outer);
let config_bytes = serde_json::to_vec(&json!({
"architecture": "amd64",
"config": {}
}))
.unwrap();
let config_sha = write_blob(&layout, &config_bytes);
let legacy = json!([
{
"Config": format!("blobs/sha256/{config_sha}"),
"RepoTags": ["multi-arch:latest"],
"Layers": [
"blobs/sha256/layeraaa",
"blobs/sha256/layerbbb"
]
}
]);
std::fs::write(
layout.join("manifest.json"),
serde_json::to_vec(&legacy).unwrap(),
)
.unwrap();
let shas = super::layer_shas_from_save_dir("multi-arch:latest", &layout, "amd64")
.expect("fallback");
assert_eq!(shas, vec!["layeraaa".to_owned(), "layerbbb".to_owned()]);
}
#[test]
fn legacy_fallback_rejects_wrong_arch() {
let layout = scratch_layout("legacy-wrong-arch");
write_index(&layout, &json!({ "schemaVersion": 2, "manifests": [] }));
let config_bytes = serde_json::to_vec(&json!({
"architecture": "arm64",
"config": {}
}))
.unwrap();
let config_sha = write_blob(&layout, &config_bytes);
let legacy = json!([
{
"Config": format!("blobs/sha256/{config_sha}"),
"RepoTags": ["x:latest"],
"Layers": [ "blobs/sha256/lll" ]
}
]);
std::fs::write(
layout.join("manifest.json"),
serde_json::to_vec(&legacy).unwrap(),
)
.unwrap();
let e = super::layer_shas_from_save_dir("x:latest", &layout, "amd64").unwrap_err();
assert!(
e.contains("describes a linux/arm64 image") && e.contains("--platform linux/amd64"),
"expected actionable arch-mismatch hint, got: {e}"
);
}
}
fn native_snapshot_after_ms(plan: &BakePlan<'_>) -> u64 {
if let Some(v) = arg_value(plan.extra_args, "--supermachine-snapshot-after-ms")
.and_then(|s| s.parse::<u64>().ok())
{
return v;
}
if let Some(v) = std::env::var("SUPERMACHINE_SNAPSHOT_AFTER_MS")
.ok()
.and_then(|s| s.parse::<u64>().ok())
{
return v;
}
if has_flag(plan.extra_args, "--supermachine-listener-required") {
7000
} else {
200
}
}
fn has_flag(extra_args: &[String], name: &str) -> bool {
extra_args.iter().any(|s| s == name)
}
fn native_supermachine_base_inputs(
plan: &BakePlan<'_>,
resolution: &ImageResolution,
root: &Path,
) -> Result<serde_json::Value, String> {
let sm22_bin = supermachine_worker_bin(root);
let kernel = supermachine_kernel(root);
let init = ensure_init_oci(root)?;
let agent = ensure_supermachine_agent(root)?;
let snapshot_hash_mode =
std::env::var("SUPERMACHINE_SNAPSHOT_HASH_MODE").unwrap_or_else(|_| "fast".to_owned());
Ok(serde_json::json!({
"version": 1,
"runtime": "supermachine",
"image": plan.image,
"image_id": resolution.image_id,
"architecture": resolution.architecture,
"guest_port": plan.guest_port,
"memory_mib": plan.memory_mib,
"vcpus": plan.vcpus,
"cmd": resolution.effective_cmd,
"cmd_override": plan.cmd_override,
"working_dir": resolution.working_dir,
"user": resolution.user,
"env": env_json_value(plan, resolution),
"extra_args": plan.extra_args,
"egress_policy": arg_value(plan.extra_args, "--egress-policy").unwrap_or(""),
"listener_settle_ms": native_listener_settle_ms(plan),
"snapshot_after_ms": native_snapshot_after_ms(plan),
"runtime_bin": file_size_mtime(&sm22_bin)?,
"kernel": file_size_mtime(&kernel)?,
"init_oci": file_size_mtime(&init)?,
"agent": file_size_mtime(&agent)?,
"snapshot_hash_mode": snapshot_hash_mode,
"host_ipv6": crate::utils::net::cached_host_ipv6_route(),
"guest_ipv6_disabled":
std::env::var("SUPERMACHINE_GUEST_IPV6").as_deref() == Ok("0"),
"kcache_enabled": std::env::var("SUPERMACHINE_KCACHE")
.as_deref()
.map(|v| v == "1" || v == "true")
.unwrap_or(false),
}))
}
fn native_supermachine_cheap_input_keys() -> &'static [&'static str] {
&[
"version",
"runtime",
"image",
"guest_port",
"memory_mib",
"vcpus",
"architecture",
"extra_args",
"cmd_override",
"egress_policy",
"listener_settle_ms",
"snapshot_after_ms",
"runtime_bin",
"kernel",
"init_oci",
"agent",
"snapshot_hash_mode",
"host_ipv6",
"guest_ipv6_disabled",
"kcache_enabled",
]
}
fn native_supermachine_cheap_inputs(
plan: &BakePlan<'_>,
root: &Path,
) -> Result<serde_json::Value, String> {
let sm22_bin = supermachine_worker_bin(root);
let kernel = supermachine_kernel(root);
let init = ensure_init_oci(root)?;
let agent = ensure_supermachine_agent(root)?;
let snapshot_hash_mode =
std::env::var("SUPERMACHINE_SNAPSHOT_HASH_MODE").unwrap_or_else(|_| "fast".to_owned());
Ok(serde_json::json!({
"version": 1,
"runtime": "supermachine",
"image": plan.image,
"guest_port": plan.guest_port,
"memory_mib": plan.memory_mib,
"vcpus": plan.vcpus,
"architecture": plan.arch(),
"extra_args": plan.extra_args,
"cmd_override": plan.cmd_override,
"egress_policy": arg_value(plan.extra_args, "--egress-policy").unwrap_or(""),
"listener_settle_ms": native_listener_settle_ms(plan),
"snapshot_after_ms": native_snapshot_after_ms(plan),
"runtime_bin": file_size_mtime(&sm22_bin)?,
"kernel": file_size_mtime(&kernel)?,
"init_oci": file_size_mtime(&init)?,
"agent": file_size_mtime(&agent)?,
"snapshot_hash_mode": snapshot_hash_mode,
"host_ipv6": crate::utils::net::cached_host_ipv6_route(),
"guest_ipv6_disabled":
std::env::var("SUPERMACHINE_GUEST_IPV6").as_deref() == Ok("0"),
"kcache_enabled": std::env::var("SUPERMACHINE_KCACHE")
.as_deref()
.map(|v| v == "1" || v == "true")
.unwrap_or(false),
}))
}
fn try_fast_cache_hit(
plan: &BakePlan<'_>,
root: &Path,
run_t0: Instant,
) -> Result<Option<NativeBakeResult>, String> {
if plan.pull_policy == "always" {
return Ok(None);
}
if snapshot_reuse_disabled() || !native_supermachine_early_reuse_supported(plan) {
return Ok(None);
}
let meta_path = plan.metadata_path();
let text = match std::fs::read_to_string(&meta_path) {
Ok(text) => text,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(e) => return Err(format!("read metadata {}: {e}", meta_path.display())),
};
let meta: serde_json::Value = serde_json::from_str(&text)
.map_err(|e| format!("parse metadata {}: {e}", meta_path.display()))?;
if meta.get("supermachine_version").and_then(|v| v.as_str()) != Some("supermachine") {
return Ok(None);
}
let Some(meta_inputs) = meta.get("native_bake_inputs").and_then(|v| v.as_object()) else {
return Ok(None);
};
let cheap = native_supermachine_cheap_inputs(plan, root)?;
let cheap_obj = cheap
.as_object()
.ok_or_else(|| "cheap bake inputs was not an object".to_owned())?;
for &key in native_supermachine_cheap_input_keys() {
if meta_inputs.get(key) != cheap_obj.get(key) {
return Ok(None);
}
}
if !metadata_snapshot_files_exist(&meta) {
return Ok(None);
}
let timings = serde_json::json!({
"total_ms": 0,
"snapshot_reused": true,
"fast_snapshot_reused": true,
"reuse_check_total_ms": elapsed_ms(run_t0),
});
Ok(Some(NativeBakeResult {
total_ms: 0,
timings,
reused: true,
}))
}
fn try_sibling_clone_fast_path(
plan: &BakePlan<'_>,
root: &Path,
run_t0: Instant,
) -> Result<Option<NativeBakeResult>, String> {
if plan.pull_policy == "always" {
return Ok(None);
}
if std::env::var("SUPERMACHINE_SIBLING_CLONE").as_deref() == Ok("0") {
return Ok(None);
}
if snapshot_reuse_disabled() || !native_supermachine_early_reuse_supported(plan) {
return Ok(None);
}
let target_dir = plan.snapshots_dir.join(plan.snapshot_name());
let cheap = native_supermachine_cheap_inputs(plan, root)?;
let cheap_obj = cheap
.as_object()
.ok_or_else(|| "cheap bake inputs was not an object".to_owned())?;
let entries = match std::fs::read_dir(plan.snapshots_dir) {
Ok(e) => e,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(e) => return Err(format!("read snapshots dir: {e}")),
};
let target_canon = std::fs::canonicalize(&target_dir).ok();
let mut best: Option<(String, std::path::PathBuf)> = None;
for entry in entries.flatten() {
let path = entry.path();
if !path.is_dir() {
continue;
}
if target_canon
.as_ref()
.and_then(|c| std::fs::canonicalize(&path).ok().map(|p| p == *c))
.unwrap_or(false)
{
continue;
}
if path
.file_name()
.and_then(|n| n.to_str())
.is_some_and(|n| n.contains("__warm__"))
{
continue;
}
let meta_path = path.join("metadata.json");
let Ok(text) = std::fs::read_to_string(&meta_path) else {
continue;
};
let Ok(meta): Result<serde_json::Value, _> = serde_json::from_str(&text) else {
continue;
};
if meta.get("supermachine_version").and_then(|v| v.as_str()) != Some("supermachine") {
continue;
}
let Some(meta_inputs) = meta.get("native_bake_inputs").and_then(|v| v.as_object()) else {
continue;
};
const TOPLEVEL_FALLBACK_KEYS: &[&str] = &["vcpus"];
let mut matched = true;
for &key in native_supermachine_cheap_input_keys() {
let from_inputs = meta_inputs.get(key);
let from_meta = from_inputs.or_else(|| {
if TOPLEVEL_FALLBACK_KEYS.contains(&key) {
meta.get(key)
} else {
None
}
});
if from_meta != cheap_obj.get(key) {
matched = false;
break;
}
}
if !matched {
continue;
}
if !metadata_snapshot_files_exist(&meta) {
continue;
}
let restore_path = path.join("restore.snap");
let Ok(stat) = std::fs::metadata(&restore_path) else {
continue;
};
if stat.len() == 0 {
continue;
}
let rank = meta
.get("baked_at")
.and_then(|v| v.as_str())
.map(ToOwned::to_owned)
.or_else(|| {
stat.modified().ok().and_then(|t| {
t.duration_since(std::time::UNIX_EPOCH)
.ok()
.map(|d| format!("@{}", d.as_secs()))
})
})
.unwrap_or_default();
if best.as_ref().is_none_or(|(r, _): &(String, _)| &rank > r) {
best = Some((rank, path));
}
}
let Some((_, sibling_dir)) = best else {
return Ok(None);
};
clone_sibling_snapshot(&sibling_dir, &target_dir, run_t0)
}
fn clone_sibling_snapshot(
sibling_dir: &Path,
target_dir: &Path,
run_t0: Instant,
) -> Result<Option<NativeBakeResult>, String> {
let t0 = Instant::now();
std::fs::create_dir_all(target_dir)
.map_err(|e| format!("mkdir {}: {e}", target_dir.display()))?;
const FILES: &[&str] = &["restore.snap", "delta.squashfs", "init.cpio.gz", "env.json"];
let tmp_seq = TEMP_COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let tmp_suffix = format!(".sibclone.tmp.{}.{}", std::process::id(), tmp_seq);
for fname in FILES {
let src = sibling_dir.join(fname);
let dst = target_dir.join(fname);
if !src.exists() {
continue;
}
let tmp = target_dir.join(format!("{fname}{tmp_suffix}"));
let _ = std::fs::remove_file(&tmp);
if let Err(e) = clonefile_one(&src, &tmp) {
if e.kind() == std::io::ErrorKind::NotFound {
let _ = std::fs::remove_file(&tmp);
cleanup_partial_sibling_clone(target_dir, &tmp_suffix);
return Ok(None);
}
return Err(format!(
"clonefile {} → {}: {e}",
src.display(),
tmp.display()
));
}
std::fs::rename(&tmp, &dst).map_err(|e| {
let _ = std::fs::remove_file(&tmp);
format!("rename {} → {}: {e}", tmp.display(), dst.display())
})?;
}
let sib_pristine_dir = sibling_dir.join("volumes");
if sib_pristine_dir.is_dir() {
let tgt_pristine_dir = target_dir.join("volumes");
std::fs::create_dir_all(&tgt_pristine_dir)
.map_err(|e| format!("mkdir {}: {e}", tgt_pristine_dir.display()))?;
let entries = match std::fs::read_dir(&sib_pristine_dir) {
Ok(e) => e,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
cleanup_partial_sibling_clone(target_dir, &tmp_suffix);
return Ok(None);
}
Err(e) => return Err(format!("read pristine dir: {e}")),
};
for entry in entries {
let entry = match entry {
Ok(e) => e,
Err(_) => continue,
};
let fname = entry.file_name();
let Some(fname_str) = fname.to_str() else {
continue;
};
if !fname_str.ends_with(".pristine") {
continue;
}
let src = entry.path();
let dst = tgt_pristine_dir.join(&fname);
let tmp = tgt_pristine_dir.join(format!("{fname_str}{tmp_suffix}"));
let _ = std::fs::remove_file(&tmp);
if let Err(e) = clonefile_one(&src, &tmp) {
if e.kind() == std::io::ErrorKind::NotFound {
let _ = std::fs::remove_file(&tmp);
continue; }
return Err(format!(
"clonefile pristine {} → {}: {e}",
src.display(),
tmp.display()
));
}
std::fs::rename(&tmp, &dst).map_err(|e| {
let _ = std::fs::remove_file(&tmp);
format!("rename {} → {}: {e}", tmp.display(), dst.display())
})?;
}
}
let sib_meta_text = match std::fs::read_to_string(sibling_dir.join("metadata.json")) {
Ok(t) => t,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
cleanup_partial_sibling_clone(target_dir, &tmp_suffix);
return Ok(None);
}
Err(e) => return Err(format!("read sibling metadata: {e}")),
};
let mut meta: serde_json::Value =
serde_json::from_str(&sib_meta_text).map_err(|e| format!("parse sibling metadata: {e}"))?;
let sib_str = sibling_dir.to_string_lossy().into_owned();
let tgt_str = target_dir.to_string_lossy().into_owned();
rewrite_meta_paths_walk(&mut meta, &sib_str, &tgt_str);
if let Some(obj) = meta.as_object_mut() {
let target_name = target_dir
.file_name()
.and_then(|n| n.to_str())
.map(ToOwned::to_owned)
.unwrap_or_default();
obj.insert("name".to_owned(), serde_json::json!(target_name));
let now_secs = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
obj.insert(
"baked_at".to_owned(),
serde_json::json!(rfc3339_z(now_secs)),
);
obj.insert(
"sibling_cloned_from".to_owned(),
serde_json::json!(sibling_dir.display().to_string()),
);
}
let new_meta_text =
serde_json::to_string_pretty(&meta).map_err(|e| format!("encode meta: {e}"))?;
let meta_dst = target_dir.join("metadata.json");
let meta_tmp = target_dir.join(format!("metadata.json{tmp_suffix}"));
std::fs::write(&meta_tmp, new_meta_text + "\n")
.map_err(|e| format!("write meta tmp {}: {e}", meta_tmp.display()))?;
std::fs::rename(&meta_tmp, &meta_dst).map_err(|e| {
let _ = std::fs::remove_file(&meta_tmp);
format!(
"rename {} → {}: {e}",
meta_tmp.display(),
meta_dst.display()
)
})?;
let total_ms = elapsed_ms(run_t0);
if trace_enabled() {
eprintln!(
"[sibling-clone] OK in {} ms: {} → {} (total bake {} ms)",
t0.elapsed().as_millis(),
sibling_dir.display(),
target_dir.display(),
total_ms,
);
}
Ok(Some(NativeBakeResult {
total_ms,
timings: serde_json::json!({
"total_ms": total_ms,
"snapshot_reused": true,
"sibling_cloned": true,
"sibling": sibling_dir.display().to_string(),
"clone_ms": t0.elapsed().as_millis() as u64,
}),
reused: true,
}))
}
fn cleanup_partial_sibling_clone(target_dir: &Path, tmp_suffix: &str) {
const FILES: &[&str] = &["restore.snap", "delta.squashfs", "init.cpio.gz", "env.json"];
for fname in FILES {
let _ = std::fs::remove_file(target_dir.join(fname));
let _ = std::fs::remove_file(target_dir.join(format!("{fname}{tmp_suffix}")));
}
}
pub(crate) fn clonefile_one(src: &Path, dst: &Path) -> std::io::Result<()> {
#[cfg(target_os = "macos")]
{
use std::os::unix::ffi::OsStrExt;
let src_c = std::ffi::CString::new(src.as_os_str().as_bytes())
.map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidInput, "path NUL"))?;
let dst_c = std::ffi::CString::new(dst.as_os_str().as_bytes())
.map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidInput, "path NUL"))?;
let ret = unsafe { libc::clonefile(src_c.as_ptr(), dst_c.as_ptr(), 0) };
if ret != 0 {
return Err(std::io::Error::last_os_error());
}
return Ok(());
}
#[cfg(not(target_os = "macos"))]
{
use std::os::unix::io::AsRawFd;
const FICLONE: libc::c_ulong = 0x4004_9409;
let src_f = std::fs::File::open(src)?;
let dst_f = std::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(dst)?;
let ret = unsafe { libc::ioctl(dst_f.as_raw_fd(), FICLONE, src_f.as_raw_fd()) };
if ret != 0 {
let err = std::io::Error::last_os_error();
let _ = std::fs::remove_file(dst);
return Err(err);
}
return Ok(());
}
#[allow(unreachable_code)]
Ok(())
}
pub(crate) fn cow_or_copy(src: &Path, dst: &Path) -> std::io::Result<()> {
let _ = std::fs::remove_file(dst);
match clonefile_one(src, dst) {
Ok(()) => Ok(()),
Err(_) => {
std::fs::copy(src, dst).map(|_| ())
}
}
}
pub(crate) fn capture_volume_pristines(
out_dir: &Path,
volumes: &[VolumeMapping],
) -> Result<Vec<PathBuf>, String> {
if volumes.is_empty() {
return Ok(Vec::new());
}
let pristine_dir = out_dir.join("volumes");
std::fs::create_dir_all(&pristine_dir)
.map_err(|e| format!("create pristine dir {}: {e}", pristine_dir.display()))?;
let mut out = Vec::with_capacity(volumes.len());
for (i, v) in volumes.iter().enumerate() {
let dst = pristine_dir.join(format!("{i}.pristine"));
cow_or_copy(&v.host_file, &dst).map_err(|e| {
format!(
"capture volume pristine {} → {}: {e}",
v.host_file.display(),
dst.display()
)
})?;
out.push(dst);
}
Ok(out)
}
fn rewrite_meta_paths_walk(v: &mut serde_json::Value, src_prefix: &str, dst_prefix: &str) {
match v {
serde_json::Value::String(s) => {
if s.starts_with(src_prefix) {
let tail = &s[src_prefix.len()..];
*s = format!("{dst_prefix}{tail}");
}
}
serde_json::Value::Array(arr) => {
for item in arr.iter_mut() {
rewrite_meta_paths_walk(item, src_prefix, dst_prefix);
}
}
serde_json::Value::Object(obj) => {
for (_, val) in obj.iter_mut() {
rewrite_meta_paths_walk(val, src_prefix, dst_prefix);
}
}
_ => {}
}
}
fn rfc3339_z(secs: u64) -> String {
let total_days = (secs / 86_400) as i64;
let mut sec_of_day = (secs % 86_400) as u32;
let hour = sec_of_day / 3600;
sec_of_day %= 3600;
let min = sec_of_day / 60;
let sec = sec_of_day % 60;
let z = total_days + 719_468;
let era = if z >= 0 { z } else { z - 146_096 } / 146_097;
let doe = (z - era * 146_097) as u64; let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146_096) / 365; let y = yoe as i64 + era * 400;
let doy = doe - (365 * yoe + yoe / 4 - yoe / 100); let mp = (5 * doy + 2) / 153; let d = doy - (153 * mp + 2) / 5 + 1; let m = if mp < 10 { mp + 3 } else { mp - 9 }; let y = if m <= 2 { y + 1 } else { y };
format!("{y:04}-{m:02}-{d:02}T{hour:02}:{min:02}:{sec:02}Z")
}
fn native_supermachine_bake_key(
plan: &BakePlan<'_>,
resolution: &ImageResolution,
layer_plan: &LayerPlan,
delta: &DeltaMaterialization,
root: &Path,
) -> Result<(String, serde_json::Value), String> {
let delta_key = delta
.key
.as_deref()
.ok_or_else(|| "native supermachine bake needs delta key".to_owned())?;
let mut inputs = native_supermachine_base_inputs(plan, resolution, root)?;
let obj = inputs
.as_object_mut()
.ok_or_else(|| "native bake inputs was not an object".to_owned())?;
obj.insert(
"layers".to_owned(),
serde_json::json!(layer_plan.layer_shas),
);
obj.insert("delta_key".to_owned(), serde_json::json!(delta_key));
let encoded =
serde_json::to_string(&inputs).map_err(|e| format!("encode native bake key: {e}"))?;
let key = sha256_text(&format!("{encoded}\n"))?;
Ok((key, inputs))
}
fn metadata_string(value: &serde_json::Value, key: &str) -> Option<String> {
value.get(key)?.as_str().map(ToOwned::to_owned)
}
fn metadata_path_value(meta: &serde_json::Value, key: &str) -> Option<PathBuf> {
metadata_string(meta, key).map(PathBuf::from)
}
fn metadata_path_exists(meta: &serde_json::Value, key: &str) -> bool {
metadata_path_value(meta, key)
.as_deref()
.map(Path::exists)
.unwrap_or(false)
}
fn snapshot_reuse_disabled() -> bool {
false
}
fn previous_metadata_cmd(plan: &BakePlan<'_>) -> Option<Vec<String>> {
let meta_path = plan.metadata_path();
let text = std::fs::read_to_string(&meta_path).ok()?;
let meta: serde_json::Value = serde_json::from_str(&text).ok()?;
if meta.get("image").and_then(|v| v.as_str()) != Some(plan.image) {
return None;
}
let arr = meta.get("cmd")?.as_array()?;
let mut out = Vec::with_capacity(arr.len());
for v in arr {
out.push(v.as_str()?.to_owned());
}
if out.is_empty() {
None
} else {
Some(out)
}
}
fn metadata_snapshot_files_exist(meta: &serde_json::Value) -> bool {
if !metadata_path_exists(meta, "snapshot_base")
|| !metadata_path_exists(meta, "delta_squashfs")
|| !metadata_path_exists(meta, "init_cpio")
{
return false;
}
meta.get("layers")
.and_then(|v| v.as_array())
.map(|layers| {
!layers.is_empty()
&& layers.iter().all(|layer| {
layer
.as_str()
.map(Path::new)
.map(Path::exists)
.unwrap_or(false)
})
})
.unwrap_or(false)
}
fn native_supermachine_early_reuse_supported(plan: &BakePlan<'_>) -> bool {
!arg_present(plan.extra_args, "--extra-file")
&& !arg_present(plan.extra_args, "--inbound-tls-autogen")
&& arg_value(plan.extra_args, "--inbound-tls-cert").is_none()
&& arg_value(plan.extra_args, "--inbound-tls-key").is_none()
}
fn native_supermachine_early_reuse_snapshot(
plan: &BakePlan<'_>,
resolution: &ImageResolution,
root: &Path,
run_t0: Instant,
) -> Result<Option<NativeBakeResult>, String> {
if snapshot_reuse_disabled() || !native_supermachine_early_reuse_supported(plan) {
return Ok(None);
}
let meta_path = plan.metadata_path();
let text = match std::fs::read_to_string(&meta_path) {
Ok(text) => text,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(e) => return Err(format!("read metadata {}: {e}", meta_path.display())),
};
let meta: serde_json::Value = serde_json::from_str(&text)
.map_err(|e| format!("parse metadata {}: {e}", meta_path.display()))?;
if meta.get("supermachine_version").and_then(|v| v.as_str()) != Some("supermachine") {
return Ok(None);
}
let Some(meta_inputs) = meta.get("native_bake_inputs").and_then(|v| v.as_object()) else {
return Ok(None);
};
let current_inputs = native_supermachine_base_inputs(plan, resolution, root)?;
let current_obj = current_inputs
.as_object()
.ok_or_else(|| "native base inputs was not an object".to_owned())?;
for (key, value) in current_obj {
if meta_inputs.get(key) != Some(value) {
return Ok(None);
}
}
if !metadata_snapshot_files_exist(&meta) {
return Ok(None);
}
let timings = serde_json::json!({
"total_ms": 0,
"snapshot_reused": true,
"early_snapshot_reused": true,
"reuse_check_total_ms": elapsed_ms(run_t0),
});
Ok(Some(NativeBakeResult {
total_ms: 0,
timings,
reused: true,
}))
}
fn native_supermachine_reuse_snapshot(
plan: &BakePlan<'_>,
expected_bake_key: &str,
run_t0: Instant,
) -> Result<Option<NativeBakeResult>, String> {
if snapshot_reuse_disabled() {
return Ok(None);
}
let meta_path = plan.metadata_path();
let text = match std::fs::read_to_string(&meta_path) {
Ok(text) => text,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(e) => return Err(format!("read metadata {}: {e}", meta_path.display())),
};
let meta: serde_json::Value = serde_json::from_str(&text)
.map_err(|e| format!("parse metadata {}: {e}", meta_path.display()))?;
if meta.get("supermachine_version").and_then(|v| v.as_str()) != Some("supermachine") {
return Ok(None);
}
if meta.get("native_bake_key").and_then(|v| v.as_str()) != Some(expected_bake_key) {
return Ok(None);
}
if !metadata_snapshot_files_exist(&meta) {
return Ok(None);
}
let total_ms = 0;
let timings = serde_json::json!({
"total_ms": total_ms,
"snapshot_reused": true,
"reuse_check_total_ms": elapsed_ms(run_t0),
});
Ok(Some(NativeBakeResult {
total_ms,
timings,
reused: true,
}))
}
fn run_native_supermachine_bake(
plan: &BakePlan<'_>,
resolution: &ImageResolution,
layer_plan: &LayerPlan,
delta: &DeltaMaterialization,
root: &Path,
native_bake_key: &str,
native_bake_inputs: &serde_json::Value,
) -> Result<NativeBakeResult, String> {
let total_t0 = Instant::now();
let out_dir = plan.snapshots_dir.join(plan.snapshot_name());
std::fs::create_dir_all(&out_dir)
.map_err(|e| format!("create snapshot dir {}: {e}", out_dir.display()))?;
let delta_cache = delta
.cache_path
.as_ref()
.filter(|p| p.is_file())
.ok_or_else(|| "native supermachine bake needs materialized delta cache".to_owned())?;
let squash_t0 = Instant::now();
let delta_squashfs = out_dir.join("delta.squashfs");
std::fs::copy(delta_cache, &delta_squashfs).map_err(|e| {
format!(
"copy delta cache {} -> {}: {e}",
delta_cache.display(),
delta_squashfs.display()
)
})?;
let squashfs_ms = elapsed_ms(squash_t0);
let env_json = write_env_json(plan, resolution, &out_dir)?;
let (init_cpio, initramfs_ms) = build_initramfs(root, &out_dir)?;
let sm22_bin = supermachine_worker_bin(root);
let kernel = supermachine_kernel(root);
let snap_base = out_dir.join("restore.snap");
let log = out_dir.join("bake.log");
let _ = std::fs::remove_file(&snap_base);
let log_file = std::fs::File::create(&log)
.map_err(|e| format!("create bake log {}: {e}", log.display()))?;
let log_err = log_file
.try_clone()
.map_err(|e| format!("clone bake log fd: {e}"))?;
let listener_settle_ms = native_listener_settle_ms(plan);
let snapshot_after_ms = native_snapshot_after_ms(plan);
let host_time = std::time::SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(|e| format!("host time before epoch: {e}"))?
.as_secs();
let bake_t0 = Instant::now();
let tsi_token_hex = generate_tsi_token_hex()?;
let mut cmd = Command::new(&sm22_bin);
cmd.arg("--kernel")
.arg(&kernel)
.arg("--initramfs")
.arg(&init_cpio)
.arg("--tsi-token")
.arg(&tsi_token_hex);
let layer_paths: Vec<PathBuf> = layer_plan
.layer_shas
.iter()
.map(|sha| layer_plan.cache_dir.join(format!("{sha}.squashfs")))
.collect();
for layer in &layer_paths {
cmd.arg("--virtio-blk").arg(layer);
}
cmd.arg("--virtio-blk").arg(&delta_squashfs);
let volumes = parse_volume_args(plan.extra_args)?;
for vol in &volumes {
ensure_volume_host_file(vol, vol.size_bytes)?;
cmd.arg("--volume").arg(format!(
"{}:{}:{}",
vol.host_file.display(),
vol.guest_path,
vol.size_bytes,
));
}
for raw in arg_values(plan.extra_args, "--mount") {
let parts: Vec<&str> = raw.splitn(4, ':').collect();
let (host, tag, guest_path, policy) = match parts.len() {
3 => (parts[0], parts[1], parts[2], None),
4 => (parts[0], parts[1], parts[2], Some(parts[3])),
_ => {
return Err(format!(
"--mount expects HOST:TAG:GUEST_PATH[:POLICY], got {raw:?}"
));
}
};
if host.is_empty() || tag.is_empty() || guest_path.is_empty() {
return Err(format!(
"--mount HOST:TAG:GUEST_PATH has empty field: {raw:?}"
));
}
let abs = std::fs::canonicalize(host).map_err(|e| format!("--mount {host}: {e}"))?;
let encoded = match policy {
None => format!("{}:{}:{}", abs.display(), tag, guest_path),
Some(p) => format!("{}:{}:{}:{}", abs.display(), tag, guest_path, p),
};
cmd.arg("--mount").arg(encoded);
}
cmd.env(
"SUPERMACHINE_HOST_IPV6",
if crate::utils::net::cached_host_ipv6_route() {
"1"
} else {
"0"
},
);
cmd.arg("--memory")
.arg(plan.memory_mib.to_string())
.arg("--vcpus")
.arg(plan.vcpus.to_string())
.arg("--cmdline")
.arg({
let mut s = format!(
"earlycon=pl011,mmio32,0x09000000 console=ttyAMA0 quiet loglevel=4 \
tsi_hijack supermachine.host_time={host_time}"
);
for extra in arg_values(plan.extra_args, "--cmdline-extra") {
s.push(' ');
s.push_str(extra);
}
s
});
if volumes.is_empty() {
cmd.arg("--snapshot-on-listener")
.arg("--snapshot-after-ms")
.arg(snapshot_after_ms.to_string())
.arg("--quiesce-ms")
.arg(listener_settle_ms.to_string());
} else {
cmd.arg("--snapshot-at").arg("1");
}
cmd.arg("--snapshot-out")
.arg(&snap_base)
.arg("--env-file")
.arg(&env_json)
.stdout(Stdio::from(log_file))
.stderr(Stdio::from(log_err));
let mut child = cmd
.spawn()
.map_err(|e| format!("spawn supermachine bake: {e}"))?;
let mut listener_ready_ms = None;
let mut snapshot_trigger_ms = None;
let mut snapshot_line_ms = None;
let deadline = Instant::now() + std::time::Duration::from_secs(240);
while Instant::now() < deadline {
if listener_ready_ms.is_none() && log_has(&log, "listener readiness") {
listener_ready_ms = Some(elapsed_ms(bake_t0));
}
if snapshot_trigger_ms.is_none() && log_has(&log, "snapshot trigger (") {
snapshot_trigger_ms = Some(elapsed_ms(bake_t0));
}
if snapshot_line_ms.is_none() && log_has(&log, "snapshot (") {
snapshot_line_ms = Some(elapsed_ms(bake_t0));
}
if snap_base.is_file() {
let _ = child.wait();
break;
}
if child
.try_wait()
.map_err(|e| format!("poll supermachine bake: {e}"))?
.is_some()
{
break;
}
if let Some((first_line, stack)) = detect_kernel_panic(&log) {
let _ = child.kill();
let _ = child.wait();
return Err(encode_kernel_panic_err(&first_line, &stack));
}
if log_has_failure(&log) {
let _ = child.kill();
let _ = child.wait();
return Err(format!("supermachine bake failed; see {}", log.display()));
}
std::thread::sleep(std::time::Duration::from_millis(50));
}
let vmm_bake_ms = elapsed_ms(bake_t0);
if !snap_base.is_file() {
if let Some((first_line, stack)) = detect_kernel_panic(&log) {
let _ = child.kill();
let _ = child.wait();
return Err(encode_kernel_panic_err(&first_line, &stack));
}
let _ = child.kill();
let _ = child.wait();
return Err(format!(
"supermachine snapshot timeout; see {}",
log.display()
));
}
if listener_ready_ms.is_none() && log_has(&log, "listener readiness") {
listener_ready_ms = Some(elapsed_ms(bake_t0));
}
if snapshot_trigger_ms.is_none() && log_has(&log, "snapshot trigger (") {
snapshot_trigger_ms = Some(elapsed_ms(bake_t0));
}
if snapshot_line_ms.is_none() && log_has(&log, "snapshot (") {
snapshot_line_ms = Some(elapsed_ms(bake_t0));
}
let snapshot_line = last_line_containing(&log, "snapshot (").unwrap_or_default();
let snapshot_reason = parse_snapshot_reason(&snapshot_line).unwrap_or_default();
let (snapshot_capture_us, snapshot_save_us) = parse_capture_save_us(&snapshot_line);
let (snapshot_ram_data_mib, snapshot_ram_zero_mib) = parse_ram_mib(&snapshot_line);
let snapshot_logical_bytes = std::fs::metadata(&snap_base).ok().map(|m| m.len());
let snapshot_physical_bytes = file_physical_bytes(&snap_base);
let listener_to_trigger_ms = listener_ready_ms
.zip(snapshot_trigger_ms)
.map(|(a, b)| b.saturating_sub(a));
let listener_to_snapshot_ms = listener_ready_ms
.zip(snapshot_line_ms)
.map(|(a, b)| b.saturating_sub(a));
let metadata_t0 = Instant::now();
let runtime_sha = sha256_file(&sm22_bin)?;
let runtime_sha16 = &runtime_sha[..runtime_sha.len().min(16)];
let hash_mode =
std::env::var("SUPERMACHINE_SNAPSHOT_HASH_MODE").unwrap_or_else(|_| "fast".to_owned());
let snapshot_id = compute_snapshot_id(plan, &snap_base, runtime_sha16, &hash_mode)?;
let baked_at = now_utc_iso()?;
let egress_policy = arg_value(plan.extra_args, "--egress-policy").unwrap_or("");
let balloon_target_pages = compute_balloon_target_pages(plan.memory_mib);
let metadata_prepare_ms = elapsed_ms(metadata_t0);
let total_ms = elapsed_ms(total_t0);
let timings = serde_json::json!({
"total_ms": total_ms,
"pull_inspect_ms": resolution.inspect_ms,
"rootfs_prepare_ms": layer_plan.plan_ms,
"rootfs_customize_ms": delta.prepare_ms,
"init_oci_build_ms": 0,
"squashfs_ms": squashfs_ms,
"initramfs_ms": initramfs_ms,
"vmm_bake_ms": vmm_bake_ms,
"metadata_prepare_ms": metadata_prepare_ms,
"guest_boot_to_listener_ms": listener_ready_ms,
"listener_settle_config_ms": listener_settle_ms,
"listener_to_snapshot_trigger_ms": listener_to_trigger_ms,
"listener_to_snapshot_ms": listener_to_snapshot_ms,
"snapshot_capture_us": snapshot_capture_us,
"snapshot_save_us": snapshot_save_us,
"snapshot_reason": snapshot_reason,
});
let parsed_volumes = parse_volume_args(plan.extra_args)?;
let volume_pristines = capture_volume_pristines(&out_dir, &parsed_volumes)?;
let volumes_meta: Vec<serde_json::Value> = parsed_volumes
.iter()
.zip(volume_pristines.iter())
.map(|(v, pristine)| {
serde_json::json!({
"host_file": v.host_file.to_string_lossy(),
"guest_path": v.guest_path,
"size_bytes": v.size_bytes,
"pristine": pristine.to_string_lossy(),
})
})
.collect();
let mounts_meta: Vec<serde_json::Value> = arg_values(plan.extra_args, "--mount")
.into_iter()
.filter_map(parse_mount_arg)
.collect();
let restart_policy = arg_value(plan.extra_args, "--restart").unwrap_or("no");
let health_cmd = arg_value(plan.extra_args, "--health-cmd").unwrap_or("");
let health_interval_secs: u32 = arg_value(plan.extra_args, "--health-interval")
.and_then(|s| s.parse().ok())
.unwrap_or(if health_cmd.is_empty() { 0 } else { 30 });
let metadata = serde_json::json!({
"name": plan.snapshot_name(),
"image": plan.image,
"port": plan.guest_port,
"memory_mib": plan.memory_mib,
"cmd": resolution.effective_cmd,
"snapshot_sha16": snapshot_id,
"snapshot_id16": snapshot_id,
"snapshot_hash_mode": hash_mode,
"runtime_sha16": runtime_sha16,
"layers": layer_paths,
"volumes": volumes_meta,
"mounts": mounts_meta,
"restart_policy": restart_policy,
"health_cmd": health_cmd,
"health_interval_secs": health_interval_secs,
"delta_squashfs": delta_squashfs,
"rootfs_squashfs": serde_json::Value::Null,
"snapshot_base": snap_base,
"snapshot_logical_bytes": snapshot_logical_bytes,
"snapshot_physical_bytes": snapshot_physical_bytes,
"snapshot_ram_data_mib": snapshot_ram_data_mib,
"snapshot_ram_zero_mib": snapshot_ram_zero_mib,
"init_cpio": init_cpio,
"kernel": kernel,
"baked_by_version": env!("CARGO_PKG_VERSION"),
"egress_policy": egress_policy,
"vcpus": plan.vcpus,
"ttl_seconds": serde_json::Value::Null,
"egress_bps": serde_json::Value::Null,
"cpu_nice": serde_json::Value::Null,
"cpu_affinity": serde_json::Value::Null,
"cpu_qos": serde_json::Value::Null,
"balloon_target_pages": balloon_target_pages,
"auth": {"type": "none"},
"native_bake_key": native_bake_key,
"native_bake_inputs": native_bake_inputs,
"timings": timings,
"baked_at": baked_at,
"supermachine_version": "supermachine",
"tsi_token": tsi_token_hex,
});
let meta_path = out_dir.join("metadata.json");
std::fs::write(
&meta_path,
serde_json::to_vec_pretty(&metadata).map_err(|e| format!("encode metadata: {e}"))?,
)
.map_err(|e| format!("write metadata {}: {e}", meta_path.display()))?;
Ok(NativeBakeResult {
total_ms,
timings,
reused: false,
})
}
#[allow(clippy::too_many_arguments)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum KcacheMode {
Disabled,
Write,
Hit,
}
fn read_supervisor_line_skip_save_notifications(
reader: &mut std::io::BufReader<std::os::unix::net::UnixStream>,
line: &mut String,
) -> std::io::Result<()> {
use std::io::BufRead;
loop {
line.clear();
let n = reader.read_line(line)?;
if n == 0 {
return Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"supervisor closed mid-protocol",
));
}
let trimmed = line.trim();
if trimmed.starts_with("SAVE_DONE ") || trimmed.starts_with("SAVE_FAIL ") {
continue;
}
return Ok(());
}
}
fn bake_deadline_secs() -> u64 {
std::env::var("SUPERMACHINE_BAKE_DEADLINE_SECS")
.ok()
.and_then(|s| s.trim().parse::<u64>().ok())
.unwrap_or(1200)
}
struct BakeDeadlineGuard {
stop: std::sync::Arc<std::sync::atomic::AtomicBool>,
handle: Option<std::thread::JoinHandle<()>>,
}
impl BakeDeadlineGuard {
fn arm(
child_pid: i32,
secs: u64,
timed_out: std::sync::Arc<std::sync::atomic::AtomicBool>,
) -> Self {
if secs == 0 {
return Self {
stop: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
handle: None,
};
}
Self::arm_with(child_pid, Duration::from_secs(secs), timed_out)
}
fn arm_with(
child_pid: i32,
dur: Duration,
timed_out: std::sync::Arc<std::sync::atomic::AtomicBool>,
) -> Self {
use std::sync::atomic::Ordering;
let stop = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let handle = {
let stop_c = stop.clone();
std::thread::Builder::new()
.name("supermachine-bake-deadline".into())
.stack_size(64 * 1024)
.spawn(move || {
let deadline = Instant::now() + dur;
while !stop_c.load(Ordering::Acquire) {
if Instant::now() >= deadline {
timed_out.store(true, Ordering::Release);
eprintln!(
"[supermachine] bake exceeded {}s deadline \
(stuck warmup?); SIGKILL worker pid {child_pid}",
dur.as_secs()
);
unsafe {
libc::kill(child_pid, libc::SIGKILL);
}
return;
}
std::thread::sleep(Duration::from_millis(200));
}
})
.ok()
};
Self { stop, handle }
}
}
impl Drop for BakeDeadlineGuard {
fn drop(&mut self) {
self.stop.store(true, std::sync::atomic::Ordering::Release);
if let Some(h) = self.handle.take() {
let _ = h.join();
}
}
}
fn run_native_supermachine_bake_pipelined(
plan: &BakePlan<'_>,
resolution: &ImageResolution,
layer_plan: &LayerPlan,
delta: &DeltaMaterialization,
root: &Path,
native_bake_key: &str,
native_bake_inputs: &serde_json::Value,
pipelined: PipelinedWarmup<'_>,
keep_alive_out: &mut Option<BakedWorker>,
) -> Result<NativeBakeResult, String> {
use std::io::{BufReader, Write};
use std::os::unix::net::UnixListener;
let total_t0 = Instant::now();
let out_dir = plan.snapshots_dir.join(plan.snapshot_name());
std::fs::create_dir_all(&out_dir)
.map_err(|e| format!("create snapshot dir {}: {e}", out_dir.display()))?;
let kcache_enabled = std::env::var("SUPERMACHINE_KCACHE")
.as_deref()
.map(|v| v == "1" || v == "true")
.unwrap_or(false)
&& pipelined.skip_warm_snapshot;
let kcache_path = if kcache_enabled {
kernel_boot_cache_snap_path(plan, root).ok()
} else {
None
};
let kcache_mode: KcacheMode = match (kcache_enabled, kcache_path.as_ref()) {
(true, Some(p)) if p.is_file() => KcacheMode::Hit,
(true, Some(_)) => KcacheMode::Write,
_ => KcacheMode::Disabled,
};
if matches!(kcache_mode, KcacheMode::Write | KcacheMode::Hit) {
if let Some(p) = kcache_path.as_ref() {
if let Some(parent) = p.parent() {
let _ = std::fs::create_dir_all(parent);
}
}
}
if trace_enabled() {
eprintln!(
"[kcache] mode={:?} path={:?}",
kcache_mode,
kcache_path.as_ref().map(|p| p.display().to_string())
);
}
if !pipelined.skip_warm_snapshot {
std::fs::create_dir_all(&pipelined.warm_dir).map_err(|e| {
format!(
"create warm snapshot dir {}: {e}",
pipelined.warm_dir.display()
)
})?;
}
let delta_cache = delta
.cache_path
.as_ref()
.filter(|p| p.is_file())
.ok_or_else(|| "pipelined bake needs materialized delta cache".to_owned())?;
let squash_t0 = Instant::now();
let delta_squashfs = out_dir.join("delta.squashfs");
std::fs::copy(delta_cache, &delta_squashfs).map_err(|e| {
format!(
"copy delta cache {} -> {}: {e}",
delta_cache.display(),
delta_squashfs.display()
)
})?;
let squashfs_ms = elapsed_ms(squash_t0);
let env_json = write_env_json(plan, resolution, &out_dir)?;
let (init_cpio, initramfs_ms) = build_initramfs(root, &out_dir)?;
let sm22_bin = supermachine_worker_bin(root);
let kernel = supermachine_kernel(root);
let snap_base = out_dir.join("restore.snap");
let snap_warm = pipelined.warm_dir.join("restore.snap");
let log = out_dir.join("bake.log");
let _ = std::fs::remove_file(&snap_base);
let _ = std::fs::remove_file(&snap_warm);
let log_file = std::fs::File::create(&log)
.map_err(|e| format!("create bake log {}: {e}", log.display()))?;
let log_err = log_file
.try_clone()
.map_err(|e| format!("clone bake log fd: {e}"))?;
let listener_settle_ms = native_listener_settle_ms(plan);
let snapshot_after_ms = native_snapshot_after_ms(plan);
let host_time = std::time::SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(|e| format!("host time before epoch: {e}"))?
.as_secs();
let suffix = format!(
"{:x}-{}",
std::process::id(),
TEMP_COUNTER.fetch_add(1, Ordering::Relaxed)
);
let socks_dir = std::path::PathBuf::from(format!("/tmp/supermachine-bake-{suffix}"));
std::fs::create_dir_all(&socks_dir)
.map_err(|e| format!("create bake socks dir {}: {e}", socks_dir.display()))?;
let vsock_mux_path = socks_dir.join("vsock-mux.sock");
let vsock_exec_path = socks_dir.join("vsock-exec.sock");
let ctl_path = socks_dir.join("ctl.sock");
let _ = std::fs::remove_file(&vsock_mux_path);
let _ = std::fs::remove_file(&vsock_exec_path);
let _ = std::fs::remove_file(&ctl_path);
let ctl_listener = UnixListener::bind(&ctl_path)
.map_err(|e| format!("bind bake ctl socket {}: {e}", ctl_path.display()))?;
let bake_t0 = Instant::now();
let tsi_token_hex = generate_tsi_token_hex()?;
let mut cmd = Command::new(&sm22_bin);
cmd.arg("--kernel")
.arg(&kernel)
.arg("--initramfs")
.arg(&init_cpio)
.arg("--tsi-token")
.arg(&tsi_token_hex);
let layer_paths: Vec<PathBuf> = layer_plan
.layer_shas
.iter()
.map(|sha| layer_plan.cache_dir.join(format!("{sha}.squashfs")))
.collect();
for layer in &layer_paths {
cmd.arg("--virtio-blk").arg(layer);
}
cmd.arg("--virtio-blk").arg(&delta_squashfs);
let volumes = parse_volume_args(plan.extra_args)?;
for vol in &volumes {
ensure_volume_host_file(vol, vol.size_bytes)?;
cmd.arg("--volume").arg(format!(
"{}:{}:{}",
vol.host_file.display(),
vol.guest_path,
vol.size_bytes,
));
}
for raw in arg_values(plan.extra_args, "--mount") {
let parts: Vec<&str> = raw.splitn(4, ':').collect();
let (host, tag, guest_path, policy) = match parts.len() {
3 => (parts[0], parts[1], parts[2], None),
4 => (parts[0], parts[1], parts[2], Some(parts[3])),
_ => {
return Err(format!(
"--mount expects HOST:TAG:GUEST_PATH[:POLICY], got {raw:?}"
));
}
};
if host.is_empty() || tag.is_empty() || guest_path.is_empty() {
return Err(format!(
"--mount HOST:TAG:GUEST_PATH has empty field: {raw:?}"
));
}
let abs = std::fs::canonicalize(host).map_err(|e| format!("--mount {host}: {e}"))?;
let encoded = match policy {
None => format!("{}:{}:{}", abs.display(), tag, guest_path),
Some(p) => format!("{}:{}:{}:{}", abs.display(), tag, guest_path, p),
};
cmd.arg("--mount").arg(encoded);
}
cmd.env(
"SUPERMACHINE_HOST_IPV6",
if crate::utils::net::cached_host_ipv6_route() {
"1"
} else {
"0"
},
);
cmd.arg("--memory")
.arg(plan.memory_mib.to_string())
.arg("--vcpus")
.arg(plan.vcpus.to_string())
.arg("--cmdline")
.arg({
let mut s = format!(
"earlycon=pl011,mmio32,0x09000000 console=ttyAMA0 quiet loglevel=4 \
tsi_hijack supermachine.host_time={host_time}"
);
if !matches!(kcache_mode, KcacheMode::Disabled) {
s.push_str(" supermachine.kcache=1");
}
let pre_exec_sync_enabled = pipelined.use_pre_exec_trigger
&& std::env::var("SUPERMACHINE_PRE_EXEC_SYNC")
.map(|v| v != "0" && v != "false")
.unwrap_or(true);
if pre_exec_sync_enabled {
s.push_str(" supermachine.pre_exec_sync=1");
}
for extra in arg_values(plan.extra_args, "--cmdline-extra") {
s.push(' ');
s.push_str(extra);
}
s
});
if !matches!(kcache_mode, KcacheMode::Disabled) {
cmd.env("SUPERMACHINE_KCACHE_AUTO_PUSH", "0");
}
if matches!(kcache_mode, KcacheMode::Hit) {
cmd.env("SUPERMACHINE_KCACHE_RESUME", "1");
if let Some(p) = kcache_path.as_ref() {
cmd.arg("--kcache-restore-from").arg(p);
}
}
if volumes.is_empty() {
let after_ms = if pipelined.use_pre_exec_trigger {
30_000
} else {
snapshot_after_ms
};
cmd.arg("--snapshot-on-listener")
.arg("--snapshot-after-ms")
.arg(after_ms.to_string())
.arg("--quiesce-ms")
.arg(listener_settle_ms.to_string());
if pipelined.use_pre_exec_trigger {
cmd.arg("--snapshot-on-pre-exec");
}
} else {
cmd.arg("--snapshot-at").arg("1");
}
cmd.arg("--env-file")
.arg(&env_json)
.arg("--vsock-mux")
.arg(&vsock_mux_path)
.arg("--vsock-exec")
.arg(&vsock_exec_path)
.arg("--pool-worker")
.arg(&ctl_path)
.stdout(Stdio::from(log_file))
.stderr(Stdio::from(log_err));
let spawn_t0 = Instant::now();
let mut child = cmd
.spawn()
.map_err(|e| format!("spawn pipelined-bake worker: {e}"))?;
if crate::trace::enabled("phases") {
eprintln!(
"[phases] worker spawn ({:?} since bake start)",
total_t0.elapsed(),
);
}
let bake_timed_out = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let _bake_deadline_guard = BakeDeadlineGuard::arm(
child.id() as i32,
bake_deadline_secs(),
bake_timed_out.clone(),
);
let panic_info: std::sync::Arc<std::sync::Mutex<Option<(String, Vec<String>)>>> =
std::sync::Arc::new(std::sync::Mutex::new(None));
let watcher_stop = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let (watcher_stop_tx, watcher_stop_rx) = std::sync::mpsc::channel::<()>();
let watcher_handle = {
let panic_info = panic_info.clone();
let watcher_stop = watcher_stop.clone();
let log_for_watcher = log.clone();
let child_pid = child.id() as i32;
std::thread::Builder::new()
.name("supermachine-bake-panic-watcher".into())
.stack_size(64 * 1024)
.spawn(move || {
loop {
if watcher_stop.load(std::sync::atomic::Ordering::Acquire) {
return;
}
if let Some((first, stack)) = detect_kernel_panic(&log_for_watcher) {
*panic_info.lock().unwrap() = Some((first, stack));
unsafe {
libc::kill(child_pid, libc::SIGKILL);
}
return;
}
match watcher_stop_rx.recv_timeout(Duration::from_millis(100)) {
Ok(()) => return,
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {}
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => return,
}
}
})
.expect("spawn supermachine-bake-panic-watcher")
};
let take_panic_err = |fallback: String| -> String {
if bake_timed_out.load(std::sync::atomic::Ordering::Acquire) {
return format!(
"pipelined bake: exceeded the {}s deadline and the worker was killed \
(a stuck warmup — the guest idled in WFI with no forward progress). \
Raise or disable via SUPERMACHINE_BAKE_DEADLINE_SECS. See {}",
bake_deadline_secs(),
log.display()
);
}
if let Some((first_line, stack)) = panic_info.lock().unwrap().take() {
encode_kernel_panic_err(&first_line, &stack)
} else {
fallback
}
};
ctl_listener
.set_nonblocking(true)
.map_err(|e| format!("set ctl listener nonblocking: {e}"))?;
let deadline = Instant::now() + Duration::from_secs(10);
let mut backoff = Duration::from_millis(1);
let stream = loop {
match ctl_listener.accept() {
Ok((s, _)) => break s,
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
if Instant::now() > deadline {
let _ = child.kill();
let _ = child.wait();
let _ = std::fs::remove_dir_all(&socks_dir);
return Err(format!(
"pipelined bake: worker ctl connect did not arrive within 10s; \
see {}",
log.display()
));
}
std::thread::sleep(backoff);
backoff = (backoff * 2).min(Duration::from_millis(10));
}
Err(e) => {
let _ = child.kill();
let _ = child.wait();
let _ = std::fs::remove_dir_all(&socks_dir);
return Err(format!("pipelined bake: ctl accept: {e}"));
}
}
};
stream
.set_nonblocking(false)
.map_err(|e| format!("set ctl stream blocking: {e}"))?;
let writer = stream
.try_clone()
.map_err(|e| format!("clone ctl stream: {e}"))?;
let mut reader = BufReader::new(stream);
let mut writer = writer;
let mut line = String::new();
if let Err(e) = read_supervisor_line_skip_save_notifications(&mut reader, &mut line) {
watcher_stop.store(true, std::sync::atomic::Ordering::Release);
let _ = watcher_handle.join();
let _ = child.kill();
let _ = child.wait();
let _ = std::fs::remove_dir_all(&socks_dir);
return Err(take_panic_err(format!("pipelined bake: read READY: {e}")));
}
if crate::trace::enabled("phases") {
eprintln!(
"[phases] READY received ({:?} since bake start, {:?} since spawn)",
total_t0.elapsed(),
spawn_t0.elapsed(),
);
}
if line.trim() != "READY" {
watcher_stop.store(true, std::sync::atomic::Ordering::Release);
let _ = watcher_handle.join();
let _ = child.kill();
let _ = child.wait();
let _ = std::fs::remove_dir_all(&socks_dir);
return Err(take_panic_err(format!(
"pipelined bake: expected READY, got {:?}; see {}",
line.trim(),
log.display()
)));
}
let bake_ready_t0 = Instant::now();
let mut listener_ready_ms: Option<u128> = None;
let mut kcache_write_done = false;
if crate::trace::enabled("phases") {
eprintln!(
"[phases] entering BAKE_READY loop ({:?} since bake start)",
total_t0.elapsed(),
);
}
loop {
if let Err(e) = read_supervisor_line_skip_save_notifications(&mut reader, &mut line) {
watcher_stop.store(true, std::sync::atomic::Ordering::Release);
let _ = watcher_handle.join();
let _ = child.kill();
let _ = child.wait();
let _ = std::fs::remove_dir_all(&socks_dir);
return Err(take_panic_err(format!(
"pipelined bake: read BAKE_READY: {e}"
)));
}
let trimmed = line.trim().to_owned();
if trimmed == "BAKE_READY" {
if crate::trace::enabled("phases") {
eprintln!(
"[phases] BAKE_READY received ({:?} since bake start, \
{:?} since READY)",
total_t0.elapsed(),
bake_ready_t0.elapsed(),
);
}
break;
}
if trimmed == "MARKER_KCACHE_READY" {
if matches!(kcache_mode, KcacheMode::Write) && !kcache_write_done {
if let Some(p) = kcache_path.as_ref() {
let cache_t0 = Instant::now();
let snap_path_str = p.to_string_lossy();
if let Err(e) = writeln!(writer, "SNAPSHOT_ASYNC {snap_path_str}")
.and_then(|()| writer.flush())
{
watcher_stop.store(true, std::sync::atomic::Ordering::Release);
let _ = watcher_handle.join();
let _ = child.kill();
let _ = child.wait();
let _ = std::fs::remove_dir_all(&socks_dir);
return Err(format!("kcache-write: send SNAPSHOT_ASYNC: {e}"));
}
if let Err(e) =
read_supervisor_line_skip_save_notifications(&mut reader, &mut line)
{
watcher_stop.store(true, std::sync::atomic::Ordering::Release);
let _ = watcher_handle.join();
let _ = child.kill();
let _ = child.wait();
let _ = std::fs::remove_dir_all(&socks_dir);
return Err(format!("kcache-write: read DONE_SNAPSHOT_ASYNC: {e}"));
}
if !line.trim().starts_with("DONE_SNAPSHOT_ASYNC") {
watcher_stop.store(true, std::sync::atomic::Ordering::Release);
let _ = watcher_handle.join();
let _ = child.kill();
let _ = child.wait();
let _ = std::fs::remove_dir_all(&socks_dir);
return Err(format!(
"kcache-write: bad SNAPSHOT_ASYNC response {:?}",
line.trim()
));
}
if let Err(e) = writeln!(writer, "PUSH_RX 52")
.and_then(|()| writeln!(writer, "PUSH_RX 0a"))
.and_then(|()| writer.flush())
{
watcher_stop.store(true, std::sync::atomic::Ordering::Release);
let _ = watcher_handle.join();
let _ = child.kill();
let _ = child.wait();
let _ = std::fs::remove_dir_all(&socks_dir);
return Err(format!("kcache-write: send PUSH_RX: {e}"));
}
if trace_enabled() {
eprintln!(
"[kcache-write] snap+push completed in {} ms",
elapsed_ms(cache_t0)
);
}
kcache_write_done = true;
}
}
continue;
}
break;
}
if line.trim() != "BAKE_READY" {
watcher_stop.store(true, std::sync::atomic::Ordering::Release);
let _ = watcher_handle.join();
let _ = child.kill();
let _ = child.wait();
let _ = std::fs::remove_dir_all(&socks_dir);
return Err(take_panic_err(format!(
"pipelined bake: expected BAKE_READY, got {:?}; see {}",
line.trim(),
log.display()
)));
}
let watcher_already_stopped = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(true));
watcher_stop.store(true, std::sync::atomic::Ordering::Release);
let _ = watcher_stop_tx.send(());
let watcher_join_t0 = Instant::now();
let watcher_handle = {
let _ = watcher_handle.join();
std::thread::spawn(|| {})
};
if crate::trace::enabled("phases") {
eprintln!(
"[phases] watcher_handle.join took {:?}",
watcher_join_t0.elapsed(),
);
}
let _ = watcher_already_stopped;
let bake_ready_ms = bake_ready_t0.elapsed().as_millis();
let log_has_t0 = Instant::now();
if log_has(&log, "listener readiness") {
listener_ready_ms = Some(elapsed_ms(bake_t0));
}
if crate::trace::enabled("phases") {
eprintln!(
"[phases] log_has(\"listener readiness\") took {:?}",
log_has_t0.elapsed(),
);
}
let parked_for_base = if plan.vcpus > 1 {
if crate::trace::enabled("phases") {
eprintln!(
"[phases] pre-base smpark_park_via_agent enter ({:?} since bake start)",
total_t0.elapsed(),
);
}
let r = smpark_park_via_agent(&vsock_exec_path);
if crate::trace::enabled("phases") {
eprintln!(
"[phases] pre-base smpark_park done ({:?} since bake start)",
total_t0.elapsed(),
);
}
r
} else {
false
};
let async_send_t0 = Instant::now();
if crate::trace::enabled("phases") {
eprintln!(
"[phases] sending SNAPSHOT_ASYNC ({:?} since bake start)",
total_t0.elapsed(),
);
}
let snap_base_str = snap_base
.to_str()
.ok_or_else(|| "snap_base path not UTF-8".to_owned())?;
writeln!(writer, "SNAPSHOT_ASYNC {snap_base_str}")
.map_err(|e| format!("write SNAPSHOT_ASYNC: {e}"))?;
writer
.flush()
.map_err(|e| format!("flush SNAPSHOT_ASYNC: {e}"))?;
read_supervisor_line_skip_save_notifications(&mut reader, &mut line)
.map_err(|e| format!("read DONE_SNAPSHOT_ASYNC: {e}"))?;
if crate::trace::enabled("phases") {
eprintln!(
"[phases] DONE_SNAPSHOT_ASYNC received ({:?} since bake start, \
{:?} since send)",
total_t0.elapsed(),
async_send_t0.elapsed(),
);
}
let line_trim = line.trim();
let mut base_capture_us: u64 = 0;
if let Some(rest) = line_trim.strip_prefix("DONE_SNAPSHOT_ASYNC") {
for kv in rest.split_ascii_whitespace() {
if let Some(v) = kv.strip_prefix("capture_us=") {
base_capture_us = v.parse().unwrap_or(0);
}
}
} else if let Some(rest) = line_trim.strip_prefix("ERR_SNAPSHOT ") {
let _ = child.kill();
let _ = child.wait();
let _ = std::fs::remove_dir_all(&socks_dir);
return Err(format!(
"pipelined bake: base snapshot failed: {}; see {}",
rest.trim(),
log.display()
));
} else {
let _ = child.kill();
let _ = child.wait();
let _ = std::fs::remove_dir_all(&socks_dir);
return Err(format!(
"pipelined bake: bad SNAPSHOT_ASYNC response {:?}; see {}",
line_trim,
log.display()
));
}
let async_send_ms = async_send_t0.elapsed().as_millis();
if parked_for_base {
match smpark_unpark_via_supervisor(&mut writer, &mut reader) {
Ok(true) => { }
Ok(false) => {
let _ = smpark_unpark_via_agent(&vsock_exec_path);
}
Err(e) => {
eprintln!("[bake] smpark_unpark_direct transport error ({e}); falling back");
let _ = smpark_unpark_via_agent(&vsock_exec_path);
}
}
}
let warmup_t0 = Instant::now();
let warmup_ctx = PipelinedWarmupContext {
vsock_mux_path: vsock_mux_path.clone(),
vsock_exec_path: vsock_exec_path.clone(),
};
if let Err(e) = (pipelined.callback)(&warmup_ctx) {
let _ = writeln!(writer, "QUIT");
let _ = writer.flush();
let _ = child.wait();
let _ = std::fs::remove_dir_all(&socks_dir);
return Err(take_panic_err(format!(
"pipelined bake: warmup callback failed: {e}"
)));
}
let warmup_ms = warmup_t0.elapsed().as_millis();
let parked_for_warm = if plan.vcpus > 1 && !pipelined.skip_warm_snapshot {
if crate::trace::enabled("phases") {
eprintln!(
"[phases] pre-warm smpark_park_via_agent enter ({:?} since bake start)",
total_t0.elapsed(),
);
}
let r = smpark_park_via_agent(&vsock_exec_path);
if crate::trace::enabled("phases") {
eprintln!(
"[phases] pre-warm smpark_park done ({:?} since bake start)",
total_t0.elapsed(),
);
}
r
} else {
false
};
let mut warm_save_us: u64 = 0;
let mut warm_bytes: u64 = 0;
let warm_send_ms: u128;
if !pipelined.skip_warm_snapshot {
let warm_send_t0 = Instant::now();
let snap_warm_str = snap_warm
.to_str()
.ok_or_else(|| "snap_warm path not UTF-8".to_owned())?;
let snap_base_str_for_warm = snap_base
.to_str()
.ok_or_else(|| "snap_base path not UTF-8".to_owned())?;
writeln!(
writer,
"SNAPSHOT {snap_warm_str} base={snap_base_str_for_warm}"
)
.map_err(|e| format!("write SNAPSHOT: {e}"))?;
writer.flush().map_err(|e| format!("flush SNAPSHOT: {e}"))?;
read_supervisor_line_skip_save_notifications(&mut reader, &mut line)
.map_err(|e| format!("read DONE_SNAPSHOT: {e}"))?;
let line_trim = line.trim();
if let Some(rest) = line_trim.strip_prefix("DONE_SNAPSHOT") {
for kv in rest.split_ascii_whitespace() {
if let Some(v) = kv.strip_prefix("save_us=") {
warm_save_us = v.parse().unwrap_or(0);
} else if let Some(v) = kv.strip_prefix("bytes_written=") {
warm_bytes = v.parse().unwrap_or(0);
}
}
} else if let Some(rest) = line_trim.strip_prefix("ERR_SNAPSHOT ") {
let _ = writeln!(writer, "QUIT");
let _ = writer.flush();
let _ = child.wait();
let _ = std::fs::remove_dir_all(&socks_dir);
return Err(format!(
"pipelined bake: warm snapshot failed: {}; see {}",
rest.trim(),
log.display()
));
} else {
let _ = writeln!(writer, "QUIT");
let _ = writer.flush();
let _ = child.wait();
let _ = std::fs::remove_dir_all(&socks_dir);
return Err(format!(
"pipelined bake: bad SNAPSHOT response {:?}; see {}",
line_trim,
log.display()
));
}
warm_send_ms = warm_send_t0.elapsed().as_millis();
} else {
warm_send_ms = 0;
}
if parked_for_warm {
match smpark_unpark_via_supervisor(&mut writer, &mut reader) {
Ok(true) => {}
Ok(false) => {
let _ = smpark_unpark_via_agent(&vsock_exec_path);
}
Err(e) => {
eprintln!("[bake] smpark_unpark_direct transport error ({e}); falling back");
let _ = smpark_unpark_via_agent(&vsock_exec_path);
}
}
}
watcher_stop.store(true, std::sync::atomic::Ordering::Release);
let _ = watcher_handle.join();
let quit_ms;
if pipelined.keep_alive {
if !pipelined.skip_warm_snapshot {
if !snap_base.is_file() {
let _ = writeln!(writer, "QUIT");
let _ = writer.flush();
let _ = child.wait();
let _ = std::fs::remove_dir_all(&socks_dir);
return Err(format!(
"pipelined bake (keep_alive): base snapshot {} missing; see {}",
snap_base.display(),
log.display()
));
}
{
let poll_t0 = std::time::Instant::now();
let poll_deadline = poll_t0 + std::time::Duration::from_secs(180);
let mut backoff = std::time::Duration::from_millis(5);
while !snap_warm.is_file() {
if std::time::Instant::now() > poll_deadline {
let _ = writeln!(writer, "QUIT");
let _ = writer.flush();
let _ = child.wait();
let _ = std::fs::remove_dir_all(&socks_dir);
return Err(format!(
"pipelined bake (keep_alive): warm snapshot {} \
did not land on disk within 180 s (bg save \
still in flight; check disk space, IO load, \
or worker logs at {})",
snap_warm.display(),
log.display()
));
}
std::thread::sleep(backoff);
backoff = (backoff * 2).min(std::time::Duration::from_millis(50));
}
if crate::trace::enabled("timings") {
eprintln!(
"[bake] waited {:?} for warm snapshot file to land",
poll_t0.elapsed()
);
}
let warm_dir = snap_warm
.parent()
.map(|p| p.to_path_buf())
.unwrap_or_else(|| std::path::PathBuf::from("."));
if let Ok(meta_text) = std::fs::read_to_string(warm_dir.join("metadata.json")) {
if let Ok(meta) = serde_json::from_str::<serde_json::Value>(&meta_text) {
let image = meta.get("image").and_then(|v| v.as_str()).unwrap_or("");
let memory_mib = meta
.get("memory_mib")
.and_then(|v| v.as_u64())
.map(|v| v as u32)
.unwrap_or(0);
let baked_by_version = meta
.get("baked_by_version")
.and_then(|v| v.as_str())
.unwrap_or("");
if !image.is_empty() && memory_mib > 0 && !baked_by_version.is_empty() {
crate::dedup::auto_dedup_on_bake(
plan.snapshots_dir,
&warm_dir,
image,
memory_mib,
baked_by_version,
);
}
}
}
}
}
let force_wait_env = std::env::var("SUPERMACHINE_BAKE_WAIT_SAVE").ok();
let wait_for_save = match force_wait_env.as_deref() {
Some("0") | Some("false") => false,
Some("1") | Some("true") => true,
_ => false,
};
let do_poll_and_dedup = |snap_base_path: PathBuf,
snapshots_dir: PathBuf,
log_path: PathBuf,
fatal_on_timeout: bool,
trace_enabled_now: bool|
-> Result<(), String> {
let poll_t0 = std::time::Instant::now();
let poll_deadline = poll_t0 + std::time::Duration::from_secs(60);
let mut backoff = std::time::Duration::from_millis(5);
while !snap_base_path.is_file() {
if std::time::Instant::now() > poll_deadline {
if fatal_on_timeout {
return Err(format!(
"pipelined bake (keep_alive+skip_warm): base \
snapshot {} did not land on disk within 60s \
(bg save still in flight; check disk space, \
IO load, or worker logs at {})",
snap_base_path.display(),
log_path.display(),
));
} else {
eprintln!(
"[bake-deferred-dedup] snap_base {} did not \
land in 60s; skipping auto-dedup",
snap_base_path.display(),
);
return Ok(());
}
}
std::thread::sleep(backoff);
backoff = (backoff * 2).min(std::time::Duration::from_millis(50));
}
if trace_enabled_now {
eprintln!(
"[bake] waited {:?} for base snapshot file to land",
poll_t0.elapsed(),
);
}
let auto_dedup_t0 = std::time::Instant::now();
let snap_dir = snap_base_path
.parent()
.map(|p| p.to_path_buf())
.unwrap_or_else(|| std::path::PathBuf::from("."));
if let Ok(meta_text) = std::fs::read_to_string(snap_dir.join("metadata.json")) {
if let Ok(meta) = serde_json::from_str::<serde_json::Value>(&meta_text) {
let image = meta.get("image").and_then(|v| v.as_str()).unwrap_or("");
let memory_mib = meta
.get("memory_mib")
.and_then(|v| v.as_u64())
.map(|v| v as u32)
.unwrap_or(0);
let baked_by_version = meta
.get("baked_by_version")
.and_then(|v| v.as_str())
.unwrap_or("");
if !image.is_empty() && memory_mib > 0 && !baked_by_version.is_empty() {
crate::dedup::auto_dedup_on_bake(
&snapshots_dir,
&snap_dir,
image,
memory_mib,
baked_by_version,
);
}
}
}
if trace_enabled_now {
eprintln!("[bake] auto-dedup pass: {:?}", auto_dedup_t0.elapsed());
}
Ok(())
};
if wait_for_save {
if let Err(e) = do_poll_and_dedup(
snap_base.clone(),
plan.snapshots_dir.to_path_buf(),
log.clone(),
true,
crate::trace::enabled("timings"),
) {
let _ = std::fs::remove_dir_all(&socks_dir);
let _ = child.kill();
let _ = child.wait();
return Err(e);
}
} else {
let snap_base_cloned = snap_base.clone();
let snapshots_dir_cloned = plan.snapshots_dir.to_path_buf();
let log_cloned = log.clone();
let trace_now = crate::trace::enabled("timings");
std::thread::Builder::new()
.name("supermachine-bake-deferred-dedup".into())
.stack_size(256 * 1024)
.spawn(move || {
let poll_t0 = std::time::Instant::now();
let poll_deadline = poll_t0 + std::time::Duration::from_secs(60);
let mut backoff = std::time::Duration::from_millis(50);
while !snap_base_cloned.is_file() {
if std::time::Instant::now() > poll_deadline {
eprintln!(
"[bake-deferred-dedup] snap_base {} did not \
land in 60s; skipping auto-dedup",
snap_base_cloned.display(),
);
return;
}
std::thread::sleep(backoff);
backoff = (backoff * 2).min(std::time::Duration::from_millis(100));
}
if trace_now {
eprintln!(
"[bake-deferred-dedup] waited {:?} for save",
poll_t0.elapsed(),
);
}
let dedup_t0 = std::time::Instant::now();
let snap_dir = snap_base_cloned
.parent()
.map(|p| p.to_path_buf())
.unwrap_or_else(|| std::path::PathBuf::from("."));
if let Ok(meta_text) = std::fs::read_to_string(snap_dir.join("metadata.json")) {
if let Ok(meta) = serde_json::from_str::<serde_json::Value>(&meta_text) {
let image = meta.get("image").and_then(|v| v.as_str()).unwrap_or("");
let memory_mib = meta
.get("memory_mib")
.and_then(|v| v.as_u64())
.map(|v| v as u32)
.unwrap_or(0);
let baked_by_version = meta
.get("baked_by_version")
.and_then(|v| v.as_str())
.unwrap_or("");
if !image.is_empty() && memory_mib > 0 && !baked_by_version.is_empty() {
crate::dedup::auto_dedup_on_bake(
&snapshots_dir_cloned,
&snap_dir,
image,
memory_mib,
baked_by_version,
);
}
}
}
if trace_now {
eprintln!("[bake-deferred-dedup] dedup pass: {:?}", dedup_t0.elapsed(),);
}
let _ = log_cloned; })
.ok();
}
let last_restore_path = if pipelined.skip_warm_snapshot {
snap_base.clone()
} else {
snap_warm.clone()
};
let reader_stream = reader.into_inner();
*keep_alive_out = Some(BakedWorker {
child,
vsock_mux_path: vsock_mux_path.clone(),
vsock_exec_path: vsock_exec_path.clone(),
control_path: ctl_path.clone(),
control_writer: writer,
control_reader: reader_stream,
socks_dir: socks_dir.clone(),
last_restore_path,
});
quit_ms = 0u128;
} else {
let quit_t0 = Instant::now();
writeln!(writer, "QUIT").map_err(|e| format!("write QUIT: {e}"))?;
writer.flush().map_err(|e| format!("flush QUIT: {e}"))?;
let _ = child.wait();
quit_ms = quit_t0.elapsed().as_millis();
let _ = std::fs::remove_dir_all(&socks_dir);
if !snap_base.is_file() {
return Err(format!(
"pipelined bake: base snapshot {} missing after worker exit; see {}",
snap_base.display(),
log.display()
));
}
if !pipelined.skip_warm_snapshot && !snap_warm.is_file() {
return Err(format!(
"pipelined bake: warm snapshot {} missing after worker exit; see {}",
snap_warm.display(),
log.display()
));
}
}
let vmm_bake_ms = elapsed_ms(bake_t0);
let snapshot_logical_bytes = std::fs::metadata(&snap_base).ok().map(|m| m.len());
let snapshot_physical_bytes = file_physical_bytes(&snap_base);
let warm_logical_bytes = if pipelined.skip_warm_snapshot {
None
} else {
std::fs::metadata(&snap_warm).ok().map(|m| m.len())
};
let warm_physical_bytes = if pipelined.skip_warm_snapshot {
None
} else {
file_physical_bytes(&snap_warm)
};
let metadata_t0 = Instant::now();
let runtime_sha = sha256_file(&sm22_bin)?;
let runtime_sha16 = &runtime_sha[..runtime_sha.len().min(16)];
let hash_mode =
std::env::var("SUPERMACHINE_SNAPSHOT_HASH_MODE").unwrap_or_else(|_| "fast".to_owned());
let baked_at = now_utc_iso()?;
let snapshot_id = if snap_base.is_file() {
compute_snapshot_id(plan, &snap_base, runtime_sha16, &hash_mode)?
} else {
let h = sha256_text(&format!("{native_bake_key}\n{baked_at}\npending"))?;
h[..h.len().min(16)].to_owned()
};
let warm_snapshot_id = if pipelined.skip_warm_snapshot {
snapshot_id.clone()
} else {
compute_snapshot_id(plan, &snap_warm, runtime_sha16, &hash_mode)?
};
let egress_policy = arg_value(plan.extra_args, "--egress-policy").unwrap_or("");
let balloon_target_pages = compute_balloon_target_pages(plan.memory_mib);
let metadata_prepare_ms = elapsed_ms(metadata_t0);
let total_ms = elapsed_ms(total_t0);
let timings = serde_json::json!({
"total_ms": total_ms,
"pull_inspect_ms": resolution.inspect_ms,
"rootfs_prepare_ms": layer_plan.plan_ms,
"rootfs_customize_ms": delta.prepare_ms,
"init_oci_build_ms": 0,
"squashfs_ms": squashfs_ms,
"initramfs_ms": initramfs_ms,
"vmm_bake_ms": vmm_bake_ms,
"metadata_prepare_ms": metadata_prepare_ms,
"guest_boot_to_listener_ms": listener_ready_ms,
"listener_settle_config_ms": listener_settle_ms,
"bake_ready_ms": bake_ready_ms,
"snapshot_async_send_ms": async_send_ms,
"warmup_ms": warmup_ms,
"warm_send_ms": warm_send_ms,
"quit_ms": quit_ms,
"snapshot_capture_us": base_capture_us,
"warm_save_us": warm_save_us,
"snapshot_reason": "pipelined",
});
let parsed_volumes = parse_volume_args(plan.extra_args)?;
let volume_pristines = capture_volume_pristines(&out_dir, &parsed_volumes)?;
let volumes_meta: Vec<serde_json::Value> = parsed_volumes
.iter()
.zip(volume_pristines.iter())
.map(|(v, pristine)| {
serde_json::json!({
"host_file": v.host_file.to_string_lossy(),
"guest_path": v.guest_path,
"size_bytes": v.size_bytes,
"pristine": pristine.to_string_lossy(),
})
})
.collect();
let warm_volumes_meta: Vec<serde_json::Value> = if pipelined.skip_warm_snapshot {
Vec::new()
} else {
let warm_pristines = capture_volume_pristines(&pipelined.warm_dir, &parsed_volumes)?;
parsed_volumes
.iter()
.zip(warm_pristines.iter())
.map(|(v, pristine)| {
serde_json::json!({
"host_file": v.host_file.to_string_lossy(),
"guest_path": v.guest_path,
"size_bytes": v.size_bytes,
"pristine": pristine.to_string_lossy(),
})
})
.collect()
};
let mounts_meta: Vec<serde_json::Value> = arg_values(plan.extra_args, "--mount")
.into_iter()
.filter_map(parse_mount_arg)
.collect();
let restart_policy = arg_value(plan.extra_args, "--restart").unwrap_or("no");
let health_cmd = arg_value(plan.extra_args, "--health-cmd").unwrap_or("");
let health_interval_secs: u32 = arg_value(plan.extra_args, "--health-interval")
.and_then(|s| s.parse().ok())
.unwrap_or(if health_cmd.is_empty() { 0 } else { 30 });
let pre_exec_sync_recorded = pipelined.use_pre_exec_trigger
&& std::env::var("SUPERMACHINE_PRE_EXEC_SYNC")
.map(|v| v != "0" && v != "false")
.unwrap_or(true);
let base_metadata = serde_json::json!({
"name": plan.snapshot_name(),
"image": plan.image,
"port": plan.guest_port,
"memory_mib": plan.memory_mib,
"cmd": resolution.effective_cmd,
"snapshot_sha16": snapshot_id,
"snapshot_id16": snapshot_id,
"snapshot_hash_mode": hash_mode,
"runtime_sha16": runtime_sha16,
"layers": layer_paths,
"volumes": volumes_meta,
"mounts": mounts_meta,
"restart_policy": restart_policy,
"health_cmd": health_cmd,
"health_interval_secs": health_interval_secs,
"delta_squashfs": delta_squashfs,
"rootfs_squashfs": serde_json::Value::Null,
"snapshot_base": snap_base,
"snapshot_logical_bytes": snapshot_logical_bytes,
"snapshot_physical_bytes": snapshot_physical_bytes,
"snapshot_ram_data_mib": serde_json::Value::Null,
"snapshot_ram_zero_mib": serde_json::Value::Null,
"init_cpio": init_cpio,
"kernel": kernel,
"egress_policy": egress_policy,
"vcpus": plan.vcpus,
"ttl_seconds": serde_json::Value::Null,
"egress_bps": serde_json::Value::Null,
"cpu_nice": serde_json::Value::Null,
"cpu_affinity": serde_json::Value::Null,
"cpu_qos": serde_json::Value::Null,
"balloon_target_pages": balloon_target_pages,
"auth": {"type": "none"},
"baked_by_version": env!("CARGO_PKG_VERSION"),
"native_bake_key": native_bake_key,
"native_bake_inputs": native_bake_inputs,
"timings": timings,
"baked_at": baked_at,
"supermachine_version": "supermachine",
"pipelined": true,
"tsi_token": tsi_token_hex,
"pre_exec_sync": pre_exec_sync_recorded,
});
let base_meta_path = out_dir.join("metadata.json");
std::fs::write(
&base_meta_path,
serde_json::to_vec_pretty(&base_metadata)
.map_err(|e| format!("encode base metadata: {e}"))?,
)
.map_err(|e| format!("write base metadata {}: {e}", base_meta_path.display()))?;
if pipelined.skip_warm_snapshot {
if crate::trace::enabled("phases") {
eprintln!(
"[phases] bake fn returning ({:?} since bake start, total_ms={})",
total_t0.elapsed(),
total_ms,
);
}
return Ok(NativeBakeResult {
total_ms,
timings,
reused: false,
});
}
let warm_name = format!("{}__warm__{}", plan.snapshot_name(), pipelined.warm_tag);
let warm_metadata = serde_json::json!({
"name": warm_name,
"image": plan.image,
"port": plan.guest_port,
"memory_mib": plan.memory_mib,
"cmd": resolution.effective_cmd,
"snapshot_sha16": warm_snapshot_id,
"snapshot_id16": warm_snapshot_id,
"snapshot_hash_mode": hash_mode,
"runtime_sha16": runtime_sha16,
"layers": layer_paths,
"volumes": warm_volumes_meta,
"mounts": mounts_meta,
"restart_policy": restart_policy,
"health_cmd": health_cmd,
"health_interval_secs": health_interval_secs,
"delta_squashfs": delta_squashfs,
"rootfs_squashfs": serde_json::Value::Null,
"snapshot_base": snap_warm,
"snapshot_logical_bytes": warm_logical_bytes,
"snapshot_physical_bytes": warm_physical_bytes,
"snapshot_ram_data_mib": serde_json::Value::Null,
"snapshot_ram_zero_mib": serde_json::Value::Null,
"init_cpio": init_cpio,
"kernel": kernel,
"egress_policy": egress_policy,
"vcpus": plan.vcpus,
"ttl_seconds": serde_json::Value::Null,
"egress_bps": serde_json::Value::Null,
"cpu_nice": serde_json::Value::Null,
"cpu_affinity": serde_json::Value::Null,
"cpu_qos": serde_json::Value::Null,
"balloon_target_pages": balloon_target_pages,
"auth": {"type": "none"},
"baked_by_version": env!("CARGO_PKG_VERSION"),
"native_bake_key": native_bake_key,
"native_bake_inputs": native_bake_inputs,
"timings": timings,
"baked_at": baked_at,
"supermachine_version": "supermachine",
"pipelined": true,
"warm_tag": pipelined.warm_tag,
"warm_of": plan.snapshot_name(),
"bytes_written": warm_bytes,
"tsi_token": tsi_token_hex,
});
let warm_meta_path = pipelined.warm_dir.join("metadata.json");
std::fs::write(
&warm_meta_path,
serde_json::to_vec_pretty(&warm_metadata)
.map_err(|e| format!("encode warm metadata: {e}"))?,
)
.map_err(|e| format!("write warm metadata {}: {e}", warm_meta_path.display()))?;
Ok(NativeBakeResult {
total_ms,
timings,
reused: false,
})
}
fn compute_snapshot_id(
plan: &BakePlan<'_>,
snap_path: &Path,
runtime_sha16: &str,
hash_mode: &str,
) -> Result<String, String> {
if hash_mode == "content" {
let h = sha256_file(snap_path)?;
Ok(h[..h.len().min(16)].to_owned())
} else if hash_mode == "fast" {
let meta = std::fs::metadata(snap_path)
.map_err(|e| format!("stat snapshot {}: {e}", snap_path.display()))?;
let mtime_ns = meta
.modified()
.map_err(|e| format!("snapshot mtime: {e}"))?
.duration_since(UNIX_EPOCH)
.map_err(|e| format!("snapshot mtime before epoch: {e}"))?
.as_nanos();
let material = format!(
"{}\0{}\0{}\0{}\0{}",
plan.snapshot_name(),
plan.image,
runtime_sha16,
meta.len(),
mtime_ns
);
let h = sha256_text(&material)?;
Ok(h[..h.len().min(16)].to_owned())
} else {
Err("SUPERMACHINE_SNAPSHOT_HASH_MODE must be fast or content".to_owned())
}
}
pub fn run_push_pipelined(
request: &BakeRequest,
run_t0: Instant,
root: &Path,
pipelined: PipelinedWarmup<'_>,
) -> Result<Option<BakedWorker>, String> {
let _span = tracing::info_span!(
"supermachine.bake_pipelined",
image = %request.image,
memory_mib = request.memory_mib,
vcpus = request.vcpus,
warm_tag = %pipelined.warm_tag,
)
.entered();
let plan = BakePlan::from_request(request);
if plan.runtime != "supermachine" {
return Err(format!(
"pipelined bake only supports the native supermachine runtime, got {:?}",
plan.runtime
));
}
if pipelined.skip_warm_snapshot
&& std::env::var("SUPERMACHINE_NATIVE_BAKE_TAIL")
.map(|v| v != "0" && v != "false")
.unwrap_or(true)
{
if let Some(_result) = try_sibling_clone_fast_path(&plan, root, run_t0)? {
if trace_enabled() {
eprintln!(
"supermachine: pipelined-bake sibling-clone total={}ms",
elapsed_ms(run_t0)
);
}
return Ok(None);
}
}
let _name_lock = acquire_bake_name_lock(&plan, run_t0)?;
if pipelined.skip_warm_snapshot
&& std::env::var("SUPERMACHINE_NATIVE_BAKE_TAIL")
.map(|v| v != "0" && v != "false")
.unwrap_or(true)
&& try_sibling_clone_fast_path(&plan, root, run_t0)?.is_some()
{
return Ok(None);
}
let source = select_image_source(plan.image, plan.arch())?;
let resolution = resolve_image(&plan, source.as_ref())?;
if trace_enabled() {
emit_image_resolution_trace(&resolution);
}
let early_reuse_eligible = pipelined.skip_warm_snapshot;
if early_reuse_eligible
&& std::env::var("SUPERMACHINE_NATIVE_BAKE_TAIL")
.map(|v| v != "0" && v != "false")
.unwrap_or(true)
{
if let Some(_result) =
native_supermachine_early_reuse_snapshot(&plan, &resolution, root, run_t0)?
{
if trace_enabled() {
eprintln!(
"supermachine: pipelined-bake reused base snapshot total={}ms",
elapsed_ms(run_t0)
);
}
return Ok(None);
}
}
let layer_plan = plan_layers(&plan, &resolution, source.as_ref())?;
let layer_materialization_result = match layer_plan.as_ref() {
Some(layer_plan) => {
materialize_missing_layers(plan.image, layer_plan, source.as_ref()).map(Some)
}
None => Ok(None),
};
if let Some(work_dir) = layer_plan
.as_ref()
.and_then(|layer_plan| layer_plan.save_work_dir.as_deref())
{
let _ = std::fs::remove_dir_all(work_dir);
}
let _layer_materialization = layer_materialization_result?;
let delta_materialization = materialize_delta_cache(&plan, &resolution, root)?;
let layer_plan = layer_plan
.as_ref()
.ok_or_else(|| "pipelined bake: missing layer plan".to_owned())?;
let delta = delta_materialization
.as_ref()
.ok_or_else(|| "pipelined bake: missing delta cache".to_owned())?;
let (native_bake_key, native_bake_inputs) =
native_supermachine_bake_key(&plan, &resolution, layer_plan, delta, root)?;
let _native_t0 = Instant::now();
let mut keep_alive_out: Option<BakedWorker> = None;
let result = run_native_supermachine_bake_pipelined(
&plan,
&resolution,
layer_plan,
delta,
root,
&native_bake_key,
&native_bake_inputs,
pipelined,
&mut keep_alive_out,
)?;
if trace_enabled() {
eprintln!(
"supermachine: pipelined bake finished after {}ms total={}ms{}",
result.total_ms,
elapsed_ms(run_t0),
if keep_alive_out.is_some() {
" (worker kept alive for warm-handoff)"
} else {
""
},
);
eprintln!("supermachine: bake timings {}", result.timings);
}
Ok(keep_alive_out)
}
fn materialize_delta_cache(
plan: &BakePlan<'_>,
resolution: &ImageResolution,
root: &Path,
) -> Result<Option<DeltaMaterialization>, String> {
if plan.runtime != "supermachine" {
return Ok(None);
}
let prepare_t0 = Instant::now();
let mut result = DeltaMaterialization {
prepare_ms: 0,
materialize_ms: 0,
cache_hit: false,
skipped: None,
key: None,
cache_path: None,
};
if arg_present(plan.extra_args, "--inbound-tls-autogen") {
result.prepare_ms = elapsed_ms(prepare_t0);
result.skipped = Some("inbound-tls-autogen".to_owned());
return Ok(Some(result));
}
let cmd_json = plan.cmd_override.map(ToOwned::to_owned).unwrap_or_else(|| {
serde_json::to_string(&resolution.effective_cmd).unwrap_or_else(|_| "[]".to_owned())
});
let mut key_material = format!("cmd={cmd_json}");
let stage = temp_work_dir("supermachine-delta-stage")?;
let materialize_t0 = Instant::now();
let materialized = (|| {
write_lines(&stage.join(".supermachine-cmd"), &resolution.effective_cmd)?;
if let Some(cwd) = resolution
.working_dir
.as_deref()
.filter(|cwd| !cwd.is_empty() && *cwd != "/")
{
std::fs::write(stage.join(".supermachine-workdir"), format!("{cwd}\n"))
.map_err(|e| format!("write .supermachine-workdir: {e}"))?;
key_material.push_str(&format!("\ncwd={cwd}"));
}
if let Some(user) = resolution.user.as_deref().filter(|user| !user.is_empty()) {
std::fs::write(stage.join(".supermachine-user"), format!("{user}\n"))
.map_err(|e| format!("write .supermachine-user: {e}"))?;
key_material.push_str(&format!("\nuser={user}"));
}
if let Some(hostname) = arg_value(plan.extra_args, "--hostname") {
if hostname.is_empty()
|| hostname.len() > 63
|| hostname.contains(|c: char| c.is_whitespace() || c == '\0')
{
return Err(format!(
"--hostname {hostname:?} invalid (1..=63 chars, no whitespace)"
));
}
std::fs::write(
stage.join(".supermachine-hostname"),
format!("{hostname}\n"),
)
.map_err(|e| format!("write .supermachine-hostname: {e}"))?;
key_material.push_str(&format!("\nhostname={hostname}"));
}
for extra in arg_values(plan.extra_args, "--extra-file") {
let Some((host, guest)) = extra.split_once(':') else {
return Err(format!("bad --extra-file '{extra}' (want host:guest)"));
};
let host_path = PathBuf::from(host);
let guest_path = guest.trim_start_matches('/');
let dst = stage.join(guest_path);
if let Some(parent) = dst.parent() {
std::fs::create_dir_all(parent)
.map_err(|e| format!("create extra parent {}: {e}", parent.display()))?;
}
std::fs::copy(&host_path, &dst).map_err(|e| {
format!(
"copy extra {} -> {}: {e}",
host_path.display(),
dst.display()
)
})?;
let mode = file_mode_octal(&host_path)?;
let sum = sha256_file(&host_path)?;
key_material.push_str(&format!("\nextra={guest}:{mode}:{sum}"));
}
let tls_cert = arg_value(plan.extra_args, "--inbound-tls-cert");
let tls_key = arg_value(plan.extra_args, "--inbound-tls-key");
if tls_cert.is_some() || tls_key.is_some() {
let cert = tls_cert
.ok_or_else(|| "need both --inbound-tls-cert and --inbound-tls-key".to_owned())?;
let key = tls_key
.ok_or_else(|| "need both --inbound-tls-cert and --inbound-tls-key".to_owned())?;
let tls_dir = stage.join("etc/supermachine");
std::fs::create_dir_all(&tls_dir)
.map_err(|e| format!("create TLS dir {}: {e}", tls_dir.display()))?;
let cert_path = PathBuf::from(cert);
let key_path = PathBuf::from(key);
std::fs::copy(&cert_path, tls_dir.join("cert.pem"))
.map_err(|e| format!("copy TLS cert {}: {e}", cert_path.display()))?;
std::fs::copy(&key_path, tls_dir.join("key.pem"))
.map_err(|e| format!("copy TLS key {}: {e}", key_path.display()))?;
set_mode(&tls_dir.join("cert.pem"), 0o644)?;
set_mode(&tls_dir.join("key.pem"), 0o600)?;
let cert_sum = sha256_file(&cert_path)?;
let key_sum = sha256_file(&key_path)?;
key_material.push_str(&format!("\ntls=provided:{cert_sum}:{key_sum}"));
}
for dir in ["proc", "sys", "dev", "dev/shm", "tmp", "run"] {
std::fs::create_dir_all(stage.join(dir))
.map_err(|e| format!("create delta dir {dir}: {e}"))?;
}
let init = ensure_init_oci(root)?;
let init_dst = stage.join("init");
std::fs::copy(&init, &init_dst)
.map_err(|e| format!("copy init-oci {}: {e}", init.display()))?;
set_mode(&init_dst, 0o755)?;
key_material.push_str(&format!("\ninit={}", file_size_mtime(&init)?));
let volumes = parse_volume_args(plan.extra_args)?;
if !volumes.is_empty() {
let body: String = volumes
.iter()
.map(|v| format!("{}\n", v.guest_path))
.collect();
std::fs::write(stage.join(".supermachine-volumes"), &body)
.map_err(|e| format!("write .supermachine-volumes: {e}"))?;
for v in &volumes {
key_material.push_str(&format!("\nvolume={}", v.guest_path));
}
}
let binds: Vec<(String, String)> = arg_values(plan.extra_args, "--mount")
.into_iter()
.filter_map(|raw| {
let parts: Vec<&str> = raw.splitn(4, ':').collect();
if parts.len() < 3 || parts[1].is_empty() || parts[2].is_empty() {
return None;
}
Some((parts[1].to_owned(), parts[2].to_owned()))
})
.collect();
if !binds.is_empty() {
let body: String = binds.iter().map(|(t, p)| format!("{t}\t{p}\n")).collect();
std::fs::write(stage.join(".supermachine-virtiofs-binds"), &body)
.map_err(|e| format!("write .supermachine-virtiofs-binds: {e}"))?;
for (t, p) in &binds {
key_material.push_str(&format!("\nvirtiofs-bind={t}={p}"));
}
}
let agent = ensure_supermachine_agent(root)?;
let agent_dst = stage.join("supermachine-agent");
std::fs::copy(&agent, &agent_dst)
.map_err(|e| format!("copy supermachine-agent {}: {e}", agent.display()))?;
set_mode(&agent_dst, 0o755)?;
key_material.push_str(&format!("\nagent={}", file_size_mtime(&agent)?));
let key = sha256_text(&format!("{key_material}\n"))?;
let delta_cache_dir = layer_cache_dir().join("deltas");
std::fs::create_dir_all(&delta_cache_dir)
.map_err(|e| format!("create delta cache dir {}: {e}", delta_cache_dir.display()))?;
let cache_path = delta_cache_dir.join(format!("{key}.squashfs"));
result.prepare_ms = elapsed_ms(prepare_t0);
result.key = Some(key);
result.cache_path = Some(cache_path.clone());
if cache_path.is_file() {
result.cache_hit = true;
return Ok(());
}
let unique = TEMP_COUNTER.fetch_add(1, Ordering::Relaxed);
let tmp = delta_cache_dir.join(format!(
".delta.{}.{}.squashfs.tmp",
std::process::id(),
unique
));
let _ = std::fs::remove_file(&tmp);
squashfs::write_squashfs(&stage, &tmp, &squashfs::Ownership::FromMetadata)
.map_err(|e| format!("squashfs delta cache: {e}"))?;
if cache_path.is_file() {
let _ = std::fs::remove_file(&tmp);
result.cache_hit = true;
} else {
std::fs::rename(&tmp, &cache_path).map_err(|e| {
format!(
"install delta cache {} -> {}: {e}",
tmp.display(),
cache_path.display()
)
})?;
result.cache_hit = false;
}
Ok(())
})();
result.materialize_ms = elapsed_ms(materialize_t0);
let _ = std::fs::remove_dir_all(&stage);
materialized?;
Ok(Some(result))
}
fn resolve_image(plan: &BakePlan<'_>, source: &dyn ImageSource) -> Result<ImageResolution, String> {
let t0 = Instant::now();
let local_arch = source.local_arch(plan.image);
let pull_action = match plan.pull_policy {
"always" => {
source.pull(plan.image, true)?;
"always".to_owned()
}
"missing" => {
if local_arch.as_deref() == Some("arm64") {
"skipped-local-arm64".to_owned()
} else {
source.pull(plan.image, false)?;
"missing-pulled-arm64".to_owned()
}
}
"never" => {
if local_arch.is_none() {
return Err(format!(
"image {} not present locally and --pull never was set",
plan.image
));
}
"never".to_owned()
}
other => return Err(format!("unknown --pull policy: {other}")),
};
let image_obj = source.inspect(plan.image)?;
let cfg = image_obj
.get("Config")
.ok_or_else(|| "image inspect missing Config".to_owned())?;
let image_id = image_obj
.get("Id")
.and_then(|v| v.as_str())
.map(|s| s.strip_prefix("sha256:").unwrap_or(s).to_owned());
let architecture = image_obj
.get("Architecture")
.and_then(|v| v.as_str())
.map(ToOwned::to_owned);
let env_count = cfg
.get("Env")
.and_then(|v| v.as_array())
.map(|v| v.len())
.unwrap_or(0);
let env = value_string_array(cfg.get("Env"));
let workdir_override = arg_value(plan.extra_args, "--workdir").map(ToOwned::to_owned);
let user_override = arg_value(plan.extra_args, "--user").map(ToOwned::to_owned);
let working_dir = workdir_override.or_else(|| {
cfg.get("WorkingDir")
.and_then(|v| v.as_str())
.filter(|s| !s.is_empty())
.map(ToOwned::to_owned)
});
let user = user_override.or_else(|| {
cfg.get("User")
.and_then(|v| v.as_str())
.filter(|s| !s.is_empty())
.map(ToOwned::to_owned)
});
let entrypoint_override = arg_value(plan.extra_args, "--entrypoint").map(ToOwned::to_owned);
let effective_cmd = if let Some(cmd_override) = plan.cmd_override {
let cmd_argv = serde_json::from_str::<Vec<String>>(cmd_override)
.map_err(|e| format!("--cmd must be a JSON string array: {e}"))?;
match &entrypoint_override {
Some(ep) if !ep.is_empty() => {
let mut argv = vec![ep.clone()];
argv.extend(cmd_argv);
argv
}
_ => {
let mut argv = value_string_array(cfg.get("Entrypoint"));
argv.extend(cmd_argv);
argv
}
}
} else if let Some(ep) = &entrypoint_override {
let mut argv = vec![ep.clone()];
argv.extend(value_string_array(cfg.get("Cmd")));
if argv.len() == 1 && ep.is_empty() {
return Err("--entrypoint cannot be empty without --cmd".to_owned());
}
argv
} else {
if let Some(prev_cmd) = previous_metadata_cmd(plan) {
prev_cmd
} else {
let mut argv = value_string_array(cfg.get("Entrypoint"));
argv.extend(value_string_array(cfg.get("Cmd")));
if argv.is_empty() {
return Err(format!(
"image {} has no CMD/ENTRYPOINT; pass --cmd '<argv json>'",
plan.image
));
}
argv
}
};
Ok(ImageResolution {
local_arch,
architecture,
image_id,
effective_cmd,
working_dir,
user,
env,
env_count,
pull_action,
inspect_ms: elapsed_ms(t0),
})
}
fn smpark_park_via_agent(vsock_exec_path: &Path) -> bool {
smpark_agent_rpc_with_retry(vsock_exec_path, "smpark_park")
}
fn smpark_unpark_via_agent(vsock_exec_path: &Path) -> bool {
smpark_agent_rpc_with_retry(vsock_exec_path, "smpark_unpark")
}
fn smpark_agent_rpc_with_retry(vsock_exec_path: &Path, action: &str) -> bool {
const ATTEMPTS: u32 = 3;
let body = serde_json::json!({ "action": action });
for attempt in 1..=ATTEMPTS {
let t0 = Instant::now();
match crate::exec::send_control_with_ack(
vsock_exec_path,
&body,
Some(Duration::from_secs(5)),
) {
Ok(_) => {
eprintln!(
"[bake] {action}: OK in {} ms{}",
t0.elapsed().as_millis(),
if attempt > 1 {
format!(" (attempt {attempt})")
} else {
String::new()
}
);
return true;
}
Err(e) => {
eprintln!(
"[bake] {action}: attempt {attempt}/{ATTEMPTS} FAIL in {} ms ({e})",
t0.elapsed().as_millis()
);
if attempt < ATTEMPTS {
std::thread::sleep(Duration::from_millis(200));
}
}
}
}
eprintln!("[bake] {action}: FAIL after {ATTEMPTS} attempts");
false
}
fn smpark_unpark_via_supervisor(
writer: &mut std::os::unix::net::UnixStream,
reader: &mut std::io::BufReader<std::os::unix::net::UnixStream>,
) -> Result<bool, String> {
use std::io::Write;
let t0 = Instant::now();
writeln!(writer, "SMPARK_UNPARK_DIRECT")
.map_err(|e| format!("write SMPARK_UNPARK_DIRECT: {e}"))?;
writer
.flush()
.map_err(|e| format!("flush SMPARK_UNPARK_DIRECT: {e}"))?;
let mut line = String::new();
read_supervisor_line_skip_save_notifications(reader, &mut line)
.map_err(|e| format!("read DONE_SMPARK_UNPARK: {e}"))?;
let trimmed = line.trim();
if trimmed == "DONE_SMPARK_UNPARK" {
eprintln!(
"[bake] smpark_unpark_direct: OK in {} ms",
t0.elapsed().as_millis()
);
Ok(true)
} else if let Some(rest) = trimmed.strip_prefix("ERR_SMPARK_UNPARK ") {
eprintln!(
"[bake] smpark_unpark_direct: unavailable in {} ms ({rest}); falling back",
t0.elapsed().as_millis()
);
Ok(false)
} else {
Err(format!("bad ack: {trimmed:?}"))
}
}
#[cfg(test)]
mod bake_helper_tests {
use super::*;
#[test]
fn snapshot_name_sanitizes_ref_punctuation() {
assert_eq!(snapshot_name_for_image("ubuntu:22.04"), "ubuntu_22_04");
assert_eq!(
snapshot_name_for_image("ghcr.io/owner/img:tag"),
"ghcr_io_owner_img_tag"
);
}
#[test]
fn snapshot_name_is_capped_at_60_chars() {
let long = format!("repo/{}", "a".repeat(200));
let name = snapshot_name_for_image(&long);
assert_eq!(name.chars().count(), 60);
}
#[test]
fn strip_sha256_prefix() {
assert_eq!(strip_sha256("sha256:deadbeef"), "deadbeef");
assert_eq!(strip_sha256("deadbeef"), "deadbeef");
}
#[test]
fn sha256_path_component_accepts_clean_hex() {
let hex = "a".repeat(64);
assert_eq!(
sha256_path_component(&format!("sha256:{hex}")).unwrap(),
hex
);
assert_eq!(sha256_path_component(&hex).unwrap(), hex);
}
#[test]
fn sha256_path_component_rejects_traversal_and_empty() {
assert!(sha256_path_component("sha256:").is_err()); assert!(sha256_path_component("sha256:../etc/passwd").is_err()); assert!(sha256_path_component("sha256:ab/cd").is_err()); assert!(sha256_path_component("sha256:../../x").is_err());
}
#[test]
fn value_string_array_forms() {
let arr = serde_json::json!(["a", "b", "c"]);
assert_eq!(
value_string_array(Some(&arr)),
vec!["a".to_string(), "b".to_string(), "c".to_string()]
);
let s = serde_json::json!("solo");
assert_eq!(value_string_array(Some(&s)), vec!["solo".to_string()]);
assert!(value_string_array(Some(&serde_json::json!(""))).is_empty());
assert!(value_string_array(Some(&serde_json::json!(42))).is_empty());
assert!(value_string_array(Some(&serde_json::Value::Null)).is_empty());
assert!(value_string_array(None).is_empty());
}
}
#[cfg(test)]
mod confined_join_tests {
use super::*;
#[test]
fn joins_clean_relative_paths_under_root() {
let root = Path::new("/save");
assert_eq!(
confined_join(root, "blobs/sha256/abc").unwrap(),
Path::new("/save/blobs/sha256/abc")
);
assert_eq!(
confined_join(root, "config.json").unwrap(),
Path::new("/save/config.json")
);
assert_eq!(confined_join(root, "./x").unwrap(), Path::new("/save/./x"));
}
#[test]
fn rejects_traversal_and_absolute_paths() {
let root = Path::new("/save");
for evil in [
"../../../../etc/passwd",
"a/../../etc/passwd",
"/etc/passwd",
"/abs",
"../x",
] {
assert!(
confined_join(root, evil).is_err(),
"must reject traversal: {evil:?}"
);
}
}
}
#[cfg(test)]
mod bake_deadline_guard_tests {
use super::*;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
fn spawn_sleeper() -> std::process::Child {
std::process::Command::new("sleep")
.arg("60")
.spawn()
.expect("spawn sleep")
}
#[test]
fn deadline_guard_kills_stuck_child() {
use std::os::unix::process::ExitStatusExt;
let mut child = spawn_sleeper();
let pid = child.id() as i32;
let timed_out = Arc::new(AtomicBool::new(false));
let _g = BakeDeadlineGuard::arm_with(pid, Duration::from_millis(150), timed_out.clone());
let mut status = None;
for _ in 0..60 {
std::thread::sleep(Duration::from_millis(50));
if let Some(s) = child.try_wait().expect("try_wait") {
status = Some(s);
break;
}
}
let status = status.expect("watchdog should have killed the stuck child");
assert_eq!(
status.signal(),
Some(libc::SIGKILL),
"child must be SIGKILLed by the deadline guard"
);
assert!(
timed_out.load(Ordering::Acquire),
"guard must flag timed_out so the driver surfaces a deadline error"
);
}
#[test]
fn deadline_guard_cancelled_on_drop_spares_child() {
let mut child = spawn_sleeper();
let pid = child.id() as i32;
let timed_out = Arc::new(AtomicBool::new(false));
{
let _g = BakeDeadlineGuard::arm_with(pid, Duration::from_secs(5), timed_out.clone());
}
std::thread::sleep(Duration::from_millis(400));
assert!(
!timed_out.load(Ordering::Acquire),
"a cancelled guard must NOT time out"
);
assert!(
child.try_wait().expect("try_wait").is_none(),
"child must still be alive after the guard was cancelled"
);
let _ = child.kill();
let _ = child.wait();
}
#[test]
fn deadline_secs_zero_disables_guard() {
let mut child = spawn_sleeper();
let pid = child.id() as i32;
let timed_out = Arc::new(AtomicBool::new(false));
let g = BakeDeadlineGuard::arm(pid, 0, timed_out.clone());
assert!(
g.handle.is_none(),
"secs=0 must not spawn a watchdog thread"
);
std::thread::sleep(Duration::from_millis(200));
assert!(!timed_out.load(Ordering::Acquire));
assert!(
child.try_wait().expect("try_wait").is_none(),
"child survives"
);
let _ = child.kill();
let _ = child.wait();
}
}