use super::*;
pub(super) fn parse_registry_image_ref(image: &str) -> Result<RegistryImageRef, String> {
let (name, reference) = if let Some((name, digest)) = image.rsplit_once('@') {
(name, digest.to_owned())
} else {
let slash = image.rfind('/');
let colon = image.rfind(':');
if let Some(colon) = colon.filter(|colon| slash.map(|slash| *colon > slash).unwrap_or(true))
{
(&image[..colon], image[colon + 1..].to_owned())
} else {
(image, "latest".to_owned())
}
};
let first = name.split('/').next().unwrap_or_default();
let has_registry = first.contains('.') || first.contains(':') || first == "localhost";
let (registry, repository) = if has_registry {
let Some((registry, repository)) = name.split_once('/') else {
return Err(format!("invalid registry image reference: {image}"));
};
(registry.to_owned(), repository.to_owned())
} else {
let repository = if name.contains('/') {
name.to_owned()
} else {
format!("library/{name}")
};
("registry-1.docker.io".to_owned(), repository)
};
if repository.is_empty() || reference.is_empty() {
return Err(format!("invalid image reference: {image}"));
}
Ok(RegistryImageRef {
registry,
repository,
reference,
})
}
pub(super) fn registry_accept_header() -> &'static str {
"application/vnd.oci.image.index.v1+json, application/vnd.docker.distribution.manifest.list.v2+json, application/vnd.oci.image.manifest.v1+json, application/vnd.docker.distribution.manifest.v2+json"
}
#[derive(Default)]
pub(super) enum RegistryAuth<'a> {
#[default]
None,
Bearer(&'a str),
Basic(&'a RegistryCreds),
}
const MAX_REGISTRY_REDIRECTS: usize = 8;
pub(super) fn curl_request(
url: &str,
accept: Option<&str>,
auth: RegistryAuth<'_>,
output_path: Option<&Path>,
) -> Result<HttpResponse, String> {
let mut current = url.to_string();
let mut auth_header: Option<String> = match &auth {
RegistryAuth::None => None,
RegistryAuth::Bearer(t) => Some(format!("Bearer {t}")),
RegistryAuth::Basic(c) => Some(c.basic_header_value()),
};
for _hop in 0..=MAX_REGISTRY_REDIRECTS {
let mut headers: Vec<(&str, &str)> = Vec::new();
if let Some(a) = accept {
headers.push(("Accept", a));
}
if let Some(ah) = &auth_header {
headers.push(("Authorization", ah));
}
let resp = crate::net::http_get_once(¤t, &headers, output_path)?;
if (300..400).contains(&resp.status) {
if let Some(loc) = crate::net::header_value(&resp.headers, "location") {
let next = crate::net::resolve_redirect(¤t, loc);
let cur_host = crate::net::parse_http_url(¤t)
.map(|t| t.1)
.unwrap_or_default();
let next_host = crate::net::parse_http_url(&next)
.map(|t| t.1)
.unwrap_or_default();
if !next_host.eq_ignore_ascii_case(&cur_host) {
auth_header = None; }
current = next;
continue;
}
}
return Ok(HttpResponse {
status: resp.status,
headers: resp.headers,
body: resp.body,
});
}
Err(format!(
"registry: too many redirects (>{MAX_REGISTRY_REDIRECTS}) for {url}"
))
}
pub(super) fn parse_retry_after_header(headers: &str) -> Option<u64> {
let line = headers
.lines()
.find(|l| l.to_ascii_lowercase().starts_with("retry-after:"))?;
line.split_once(':')?.1.trim().parse::<u64>().ok()
}
pub(super) fn parse_bearer_challenge(headers: &str) -> Option<(String, String, String)> {
let line = headers
.lines()
.find(|line| line.to_ascii_lowercase().starts_with("www-authenticate:"))?;
let value = line.split_once(':')?.1.trim();
let params = value.strip_prefix("Bearer ")?;
let mut realm = None;
let mut service = None;
let mut scope = None;
for part in params.split(',') {
let (k, v) = part.trim().split_once('=')?;
let v = v.trim().trim_matches('"').to_owned();
match k {
"realm" => realm = Some(v),
"service" => service = Some(v),
"scope" => scope = Some(v),
_ => {}
}
}
Some((
realm?,
service.unwrap_or_default(),
scope.unwrap_or_default(),
))
}
pub(super) fn url_encode_query(s: &str) -> String {
let mut out = String::new();
for b in s.bytes() {
if b.is_ascii_alphanumeric() || matches!(b, b'-' | b'.' | b'_' | b'~') {
out.push(b as char);
} else {
out.push_str(&format!("%{b:02X}"));
}
}
out
}
pub(super) fn registry_token(
realm: &str,
service: &str,
scope: &str,
creds: Option<&RegistryCreds>,
) -> Result<(String, std::time::Duration), String> {
let mut url = format!("{realm}?service={}", url_encode_query(service));
if !scope.is_empty() {
url.push_str("&scope=");
url.push_str(&url_encode_query(scope));
}
let auth = match creds {
Some(c) => RegistryAuth::Basic(c),
None => RegistryAuth::None,
};
let resp = curl_request(&url, None, auth, None)?;
if resp.status >= 400 {
return Err(format!(
"registry token request failed with HTTP {} (creds={})",
resp.status,
if creds.is_some() { "yes" } else { "no" },
));
}
let json: serde_json::Value =
serde_json::from_slice(&resp.body).map_err(|e| format!("registry token JSON: {e}"))?;
let token = json
.get("token")
.or_else(|| json.get("access_token"))
.and_then(|v| v.as_str())
.map(ToOwned::to_owned)
.ok_or_else(|| "registry token response missing token".to_owned())?;
let expires_in = json
.get("expires_in")
.and_then(|v| v.as_u64())
.unwrap_or(60);
let ttl = std::time::Duration::from_secs(expires_in.saturating_sub(15).max(15));
Ok((token, ttl))
}
fn registry_token_cache(
) -> &'static std::sync::Mutex<std::collections::HashMap<String, (String, std::time::Instant)>> {
static C: std::sync::OnceLock<
std::sync::Mutex<std::collections::HashMap<String, (String, std::time::Instant)>>,
> = std::sync::OnceLock::new();
C.get_or_init(|| std::sync::Mutex::new(std::collections::HashMap::new()))
}
fn token_cache_key(registry: &str, repository: &str) -> String {
format!("{registry}\0{repository}")
}
fn cached_registry_token(registry: &str, repository: &str) -> Option<String> {
let key = token_cache_key(registry, repository);
let cache = registry_token_cache().lock().ok()?;
let (tok, expiry) = cache.get(&key)?;
(*expiry > std::time::Instant::now()).then(|| tok.clone())
}
fn store_registry_token(registry: &str, repository: &str, token: &str, ttl: std::time::Duration) {
if let Ok(mut cache) = registry_token_cache().lock() {
cache.insert(
token_cache_key(registry, repository),
(token.to_owned(), std::time::Instant::now() + ttl),
);
}
}
fn invalidate_registry_token(registry: &str, repository: &str) {
if let Ok(mut cache) = registry_token_cache().lock() {
cache.remove(&token_cache_key(registry, repository));
}
}
pub(super) fn registry_request(
image: &RegistryImageRef,
path: &str,
accept: Option<&str>,
output_path: Option<&Path>,
) -> Result<HttpResponse, String> {
let url = format!(
"https://{}/v2/{}/{}",
image.registry, image.repository, path
);
let creds = docker_config_auth(&image.registry);
if let Some(token) = cached_registry_token(&image.registry, &image.repository) {
let resp = curl_request(&url, accept, RegistryAuth::Bearer(&token), output_path)?;
if resp.status != 401 {
return Ok(resp);
}
invalidate_registry_token(&image.registry, &image.repository);
}
let first = curl_request(&url, accept, RegistryAuth::None, output_path)?;
if first.status != 401 {
return Ok(first);
}
let (realm, service, scope) = parse_bearer_challenge(&first.headers)
.ok_or_else(|| format!("registry auth challenge missing/unsupported for {url}"))?;
let (token, ttl) = registry_token(&realm, &service, &scope, creds.as_ref())?;
store_registry_token(&image.registry, &image.repository, &token, ttl);
curl_request(&url, accept, RegistryAuth::Bearer(&token), output_path)
}
pub(super) fn read_registry_manifest(
image: &RegistryImageRef,
arch: &str,
force_refresh: bool,
) -> Result<RegistryManifest, String> {
let ref_cache_ttl_ms = registry_ref_cache_ttl_ms();
if !force_refresh {
if let Some(cached) = read_registry_ref_cache(image, arch, ref_cache_ttl_ms)? {
return Ok(cached);
}
}
let mut blob_cache_hits = 0usize;
let mut blob_downloads = 0usize;
let t_top = std::time::Instant::now();
let mut resp = registry_request(
image,
&format!("manifests/{}", image.reference),
Some(registry_accept_header()),
None,
)?;
if trace_enabled() {
eprintln!(
"supermachine: registry top-manifest (probe+token+get) wall_ms={}",
t_top.elapsed().as_millis()
);
}
let mut attempt: u32 = 0;
while (resp.status == 429 || resp.status == 503) && attempt < 4 {
let retry_after_secs = parse_retry_after_header(&resp.headers)
.unwrap_or_else(|| 1u64 << attempt) .min(30);
eprintln!(
"registry manifest {}/{}:{} got HTTP {} — retrying in {}s (attempt {}/4)",
image.registry,
image.repository,
image.reference,
resp.status,
retry_after_secs,
attempt + 1,
);
std::thread::sleep(std::time::Duration::from_secs(retry_after_secs));
attempt += 1;
resp = registry_request(
image,
&format!("manifests/{}", image.reference),
Some(registry_accept_header()),
None,
)?;
}
if resp.status >= 400 {
return Err(format!(
"registry manifest request failed for {}/{}:{} with HTTP {}",
image.registry, image.repository, image.reference, resp.status
));
}
verify_pinned_manifest_digest(&image.reference, &resp.body)
.map_err(|e| format!("{}/{}: {e}", image.registry, image.repository))?;
verify_advertised_content_digest(&resp.headers, &resp.body)
.map_err(|e| format!("{}/{}: {e}", image.registry, image.repository))?;
let root: serde_json::Value =
serde_json::from_slice(&resp.body).map_err(|e| format!("registry manifest JSON: {e}"))?;
let manifest_bytes = if root.get("manifests").is_some() {
let desc = find_registry_manifest_descriptor(&root, arch)?;
let digest = desc
.get("digest")
.and_then(|v| v.as_str())
.ok_or_else(|| format!("registry {arch} descriptor missing digest"))?;
let t_arch = std::time::Instant::now();
let (bytes, cache_hit) = read_or_fetch_registry_blob_bytes(
image,
digest,
&format!("manifests/{digest}"),
Some(registry_accept_header()),
)?;
if trace_enabled() {
eprintln!(
"supermachine: registry arch-manifest wall_ms={} cache_hit={cache_hit}",
t_arch.elapsed().as_millis()
);
}
if cache_hit {
blob_cache_hits += 1;
} else {
blob_downloads += 1;
}
bytes
} else {
let manifest_digest = sha256_bytes(&resp.body)?;
store_registry_blob_bytes(&manifest_digest, &resp.body)?;
blob_downloads += 1;
resp.body
};
let manifest: serde_json::Value = serde_json::from_slice(&manifest_bytes)
.map_err(|e| format!("registry {arch} manifest JSON: {e}"))?;
let config_digest = manifest
.get("config")
.and_then(|v| v.get("digest"))
.and_then(|v| v.as_str())
.ok_or_else(|| "registry manifest missing config digest".to_owned())?
.to_owned();
let manifest_digest = sha256_bytes(&manifest_bytes)?;
let t_cfg = std::time::Instant::now();
let (config_bytes, cache_hit) = read_or_fetch_registry_blob_bytes(
image,
&config_digest,
&format!("blobs/{config_digest}"),
None,
)?;
if trace_enabled() {
eprintln!(
"supermachine: registry config-blob wall_ms={} cache_hit={cache_hit}",
t_cfg.elapsed().as_millis()
);
}
if cache_hit {
blob_cache_hits += 1;
} else {
blob_downloads += 1;
}
serde_json::from_slice::<serde_json::Value>(&config_bytes)
.map_err(|e| format!("registry config JSON: {e}"))?;
write_registry_ref_cache(image, arch, &manifest_digest, &config_digest)?;
Ok(RegistryManifest {
manifest_digest,
manifest_bytes,
manifest,
config_digest,
config_bytes,
blob_cache_hits,
blob_downloads,
ref_cache_hit: false,
ref_cache_age_ms: None,
ref_cache_ttl_ms,
})
}
pub(super) fn find_registry_manifest_descriptor(
index: &serde_json::Value,
arch: &str,
) -> Result<serde_json::Value, String> {
let manifests = index
.get("manifests")
.and_then(|v| v.as_array())
.ok_or_else(|| "registry index missing manifests".to_owned())?;
manifests
.iter()
.find(|desc| {
descriptor_platform_arch(desc) == Some(arch)
&& desc
.get("platform")
.and_then(|p| p.get("os"))
.and_then(|v| v.as_str())
.unwrap_or("linux")
== "linux"
})
.cloned()
.ok_or_else(|| format!("registry image has no linux/{arch} manifest"))
}
pub(super) fn verify_pinned_manifest_digest(reference: &str, body: &[u8]) -> Result<(), String> {
let Some(expected_hex) = reference.strip_prefix("sha256:") else {
return Ok(());
};
let actual = sha256_bytes(body)?;
if actual != expected_hex {
return Err(format!(
"registry manifest digest mismatch: pinned sha256:{expected_hex}, server \
returned sha256:{actual} — refusing to bake (possible tampering)"
));
}
Ok(())
}
pub(super) fn parse_content_digest_header(headers: &str) -> Option<String> {
let line = headers
.lines()
.find(|l| l.to_ascii_lowercase().starts_with("docker-content-digest:"))?;
Some(line.split_once(':')?.1.trim().to_owned())
}
pub(super) fn verify_advertised_content_digest(headers: &str, body: &[u8]) -> Result<(), String> {
let Some(advertised) = parse_content_digest_header(headers) else {
return Ok(());
};
let Some(adv_hex) = advertised.strip_prefix("sha256:") else {
return Ok(());
};
let actual = sha256_bytes(body)?;
if actual != adv_hex {
return Err(format!(
"registry advertised Docker-Content-Digest sha256:{adv_hex} but the returned \
body hashes to sha256:{actual} — refusing to bake (registry/proxy mislabeled content)"
));
}
Ok(())
}
pub(super) fn registry_ref_cache_ttl_ms() -> u128 {
std::env::var("SUPERMACHINE_REGISTRY_REF_CACHE_TTL_MS")
.ok()
.and_then(|s| s.parse::<u128>().ok())
.unwrap_or(60_000)
}
pub(super) fn registry_ref_cache_dir() -> PathBuf {
layer_cache_dir().join("registry/refs")
}
pub(super) fn registry_ref_cache_path(
image: &RegistryImageRef,
arch: &str,
) -> Result<PathBuf, String> {
let key = sha256_text(&format!(
"{}\n{}\n{}\nlinux/{arch}\n",
image.registry, image.repository, image.reference
))?;
Ok(registry_ref_cache_dir().join(format!("{key}.json")))
}
pub(super) fn read_registry_ref_cache(
image: &RegistryImageRef,
arch: &str,
ttl_ms: u128,
) -> Result<Option<RegistryManifest>, String> {
if ttl_ms == 0 {
return Ok(None);
}
let path = registry_ref_cache_path(image, arch)?;
let text = match std::fs::read_to_string(&path) {
Ok(text) => text,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(e) => return Err(format!("read registry ref cache {}: {e}", path.display())),
};
let json: serde_json::Value = serde_json::from_str(&text)
.map_err(|e| format!("parse registry ref cache {}: {e}", path.display()))?;
if json.get("version").and_then(|v| v.as_u64()) != Some(1) {
return Ok(None);
}
let created_ms = json
.get("created_ms")
.and_then(|v| v.as_u64())
.ok_or_else(|| format!("registry ref cache {} missing created_ms", path.display()))?
as u128;
let age_ms = epoch_ms()?.saturating_sub(created_ms);
if age_ms > ttl_ms {
return Ok(None);
}
let manifest_digest = json
.get("manifest_digest")
.and_then(|v| v.as_str())
.ok_or_else(|| {
format!(
"registry ref cache {} missing manifest_digest",
path.display()
)
})?
.to_owned();
let config_digest = json
.get("config_digest")
.and_then(|v| v.as_str())
.ok_or_else(|| {
format!(
"registry ref cache {} missing config_digest",
path.display()
)
})?
.to_owned();
let manifest_path = registry_blob_cache_path(&manifest_digest)?;
let config_path = registry_blob_cache_path(&config_digest)?;
let manifest_bytes = match std::fs::read(&manifest_path) {
Ok(bytes) => bytes,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(e) => {
return Err(format!(
"read registry manifest cache {}: {e}",
manifest_path.display()
))
}
};
let config_bytes = match std::fs::read(&config_path) {
Ok(bytes) => bytes,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(e) => {
return Err(format!(
"read registry config cache {}: {e}",
config_path.display()
))
}
};
let manifest: serde_json::Value = serde_json::from_slice(&manifest_bytes)
.map_err(|e| format!("registry cached manifest JSON: {e}"))?;
let manifest_config_digest = manifest
.get("config")
.and_then(|v| v.get("digest"))
.and_then(|v| v.as_str())
.ok_or_else(|| "registry cached manifest missing config digest".to_owned())?;
if manifest_config_digest != config_digest {
return Ok(None);
}
serde_json::from_slice::<serde_json::Value>(&config_bytes)
.map_err(|e| format!("registry cached config JSON: {e}"))?;
Ok(Some(RegistryManifest {
manifest_digest,
manifest_bytes,
manifest,
config_digest,
config_bytes,
blob_cache_hits: 2,
blob_downloads: 0,
ref_cache_hit: true,
ref_cache_age_ms: Some(age_ms),
ref_cache_ttl_ms: ttl_ms,
}))
}
pub(super) fn write_registry_ref_cache(
image: &RegistryImageRef,
arch: &str,
manifest_digest: &str,
config_digest: &str,
) -> Result<(), String> {
let path = registry_ref_cache_path(image, arch)?;
let dir = path
.parent()
.ok_or_else(|| format!("registry ref cache path has no parent: {}", path.display()))?;
std::fs::create_dir_all(dir).map_err(|e| format!("create registry ref cache: {e}"))?;
let json = serde_json::json!({
"version": 1,
"created_ms": epoch_ms()?,
"registry": image.registry,
"repository": image.repository,
"reference": image.reference,
"platform": {"os": "linux", "architecture": arch},
"manifest_digest": manifest_digest,
"config_digest": config_digest,
});
let tmp = path.with_extension(format!(
"json.{}.{}.tmp",
std::process::id(),
TEMP_COUNTER.fetch_add(1, Ordering::Relaxed)
));
std::fs::write(
&tmp,
serde_json::to_vec_pretty(&json).map_err(|e| format!("encode registry ref cache: {e}"))?,
)
.map_err(|e| format!("write registry ref cache {}: {e}", tmp.display()))?;
std::fs::rename(&tmp, &path).map_err(|e| {
let _ = std::fs::remove_file(&tmp);
format!(
"install registry ref cache {} -> {}: {e}",
tmp.display(),
path.display()
)
})
}
pub(super) fn registry_blob_cache_dir() -> PathBuf {
layer_cache_dir().join("registry/blobs/sha256")
}
pub(super) fn registry_blob_cache_path(digest: &str) -> Result<PathBuf, String> {
Ok(registry_blob_cache_dir().join(sha256_path_component(digest)?))
}
pub(super) fn registry_blob_tmp_path(sha: &str) -> PathBuf {
let unique = TEMP_COUNTER.fetch_add(1, Ordering::Relaxed);
registry_blob_cache_dir().join(format!(".{sha}.{}.{}.tmp", std::process::id(), unique))
}
pub(super) fn store_registry_blob_bytes(sha: &str, bytes: &[u8]) -> Result<(), String> {
let cache = registry_blob_cache_path(sha)?;
if cache.is_file() {
return Ok(());
}
let actual = sha256_bytes(bytes)?;
if actual != strip_sha256(sha) {
return Err(format!(
"registry blob digest mismatch: expected {}, got {actual}",
strip_sha256(sha)
));
}
std::fs::create_dir_all(registry_blob_cache_dir())
.map_err(|e| format!("create registry blob cache: {e}"))?;
let tmp = registry_blob_tmp_path(&strip_sha256(sha));
std::fs::write(&tmp, bytes)
.map_err(|e| format!("write registry cache {}: {e}", tmp.display()))?;
match std::fs::rename(&tmp, &cache) {
Ok(()) => Ok(()),
Err(e) if cache.is_file() => {
let _ = std::fs::remove_file(&tmp);
let _ = e;
Ok(())
}
Err(e) => {
let _ = std::fs::remove_file(&tmp);
Err(format!(
"install registry cache {} -> {}: {e}",
tmp.display(),
cache.display()
))
}
}
}
pub(super) fn read_or_fetch_registry_blob_bytes(
image: &RegistryImageRef,
digest: &str,
registry_path: &str,
accept: Option<&str>,
) -> Result<(Vec<u8>, bool), String> {
let cache = registry_blob_cache_path(digest)?;
if cache.is_file() {
let bytes = std::fs::read(&cache)
.map_err(|e| format!("read registry cache {}: {e}", cache.display()))?;
return Ok((bytes, true));
}
let resp = registry_request(image, registry_path, accept, None)?;
if resp.status >= 400 {
return Err(format!("registry blob {digest} HTTP {}", resp.status));
}
store_registry_blob_bytes(digest, &resp.body)?;
Ok((resp.body, false))
}
pub(super) fn copy_or_fetch_registry_blob_to_layout(
image: &RegistryImageRef,
digest: &str,
out: &Path,
) -> Result<bool, String> {
if out.is_file() {
return Ok(true);
}
let cache = registry_blob_cache_path(digest)?;
if cache.is_file() {
std::fs::copy(&cache, out).map_err(|e| {
format!(
"copy registry cache {} -> {}: {e}",
cache.display(),
out.display()
)
})?;
return Ok(true);
}
std::fs::create_dir_all(registry_blob_cache_dir())
.map_err(|e| format!("create registry blob cache: {e}"))?;
let sha = strip_sha256(digest);
let tmp = registry_blob_tmp_path(&sha);
let resp = registry_request(image, &format!("blobs/{digest}"), None, Some(&tmp))?;
if resp.status >= 400 {
let _ = std::fs::remove_file(&tmp);
return Err(format!("registry layer blob {digest} HTTP {}", resp.status));
}
let actual = sha256_file(&tmp)?;
if actual != sha {
let _ = std::fs::remove_file(&tmp);
return Err(format!(
"registry blob digest mismatch: expected {sha}, got {actual}"
));
}
match std::fs::rename(&tmp, &cache) {
Ok(()) => {}
Err(e) if cache.is_file() => {
let _ = std::fs::remove_file(&tmp);
let _ = e;
}
Err(e) => {
let _ = std::fs::remove_file(&tmp);
return Err(format!(
"install registry cache {} -> {}: {e}",
tmp.display(),
cache.display()
));
}
}
std::fs::copy(&cache, out).map_err(|e| {
format!(
"copy registry cache {} -> {}: {e}",
cache.display(),
out.display()
)
})?;
Ok(false)
}
pub(super) fn fetch_registry_to_oci_layout(
image: &RegistryImageRef,
arch: &str,
layout: &Path,
force_refresh: bool,
) -> Result<(), String> {
let _ = std::fs::remove_dir_all(layout);
std::fs::create_dir_all(layout.join("blobs/sha256"))
.map_err(|e| format!("create registry OCI layout {}: {e}", layout.display()))?;
std::fs::write(
layout.join("oci-layout"),
r#"{"imageLayoutVersion":"1.0.0"}"#,
)
.map_err(|e| format!("write OCI layout marker: {e}"))?;
let t_manifest = std::time::Instant::now();
let rm = read_registry_manifest(image, arch, force_refresh)?;
if trace_enabled() {
eprintln!(
"supermachine: registry manifest+auth fetch wall_ms={}",
t_manifest.elapsed().as_millis()
);
}
let mut blob_cache_hits = rm.blob_cache_hits;
let mut blob_downloads = rm.blob_downloads;
if trace_enabled() {
eprintln!(
"supermachine: registry ref cache_hit={} age_ms={} ttl_ms={} cache={}",
rm.ref_cache_hit,
rm.ref_cache_age_ms
.map(|age| age.to_string())
.unwrap_or_default(),
rm.ref_cache_ttl_ms,
registry_ref_cache_dir().display()
);
}
write_oci_blob_bytes(layout, &rm.manifest_digest, &rm.manifest_bytes)?;
write_oci_blob_bytes(layout, &strip_sha256(&rm.config_digest), &rm.config_bytes)?;
let layers = rm
.manifest
.get("layers")
.and_then(|v| v.as_array())
.ok_or_else(|| "registry manifest missing layers".to_owned())?;
use std::sync::atomic::AtomicUsize;
let blob_hits = AtomicUsize::new(blob_cache_hits);
let blob_dls = AtomicUsize::new(blob_downloads);
let blob_jobs: Vec<(String, PathBuf)> = layers
.iter()
.map(|layer| {
let digest = layer
.get("digest")
.and_then(|v| v.as_str())
.ok_or_else(|| "registry layer missing digest".to_owned())?;
let sha = strip_sha256(digest);
let out = layout.join("blobs/sha256").join(&sha);
Ok::<_, String>((digest.to_owned(), out))
})
.collect::<Result<_, _>>()?;
let download_bytes: u64 = layers
.iter()
.filter_map(|l| l.get("size").and_then(|v| v.as_u64()))
.sum();
let dl_t0 = std::time::Instant::now();
let fetch_results: Vec<Result<(), String>> = std::thread::scope(|s| {
let mut handles = Vec::with_capacity(blob_jobs.len());
for (digest, out) in &blob_jobs {
if out.is_file() {
blob_hits.fetch_add(1, Ordering::Relaxed);
continue;
}
let hits_ref = &blob_hits;
let dls_ref = &blob_dls;
handles.push(s.spawn(move || -> Result<(), String> {
if copy_or_fetch_registry_blob_to_layout(image, digest, out)? {
hits_ref.fetch_add(1, Ordering::Relaxed);
} else {
dls_ref.fetch_add(1, Ordering::Relaxed);
}
Ok(())
}));
}
handles
.into_iter()
.map(|h| {
h.join()
.unwrap_or_else(|_| Err("blob fetch thread panicked".to_owned()))
})
.collect()
});
for r in fetch_results {
r?;
}
blob_cache_hits = blob_hits.load(Ordering::Relaxed);
blob_downloads = blob_dls.load(Ordering::Relaxed);
if trace_enabled() && blob_downloads > 0 {
let secs = dl_t0.elapsed().as_secs_f64();
eprintln!(
"supermachine: registry blob download wall_ms={} layers={} bytes={} mbps={:.1}",
dl_t0.elapsed().as_millis(),
blob_jobs.len(),
download_bytes,
if secs > 0.0 {
(download_bytes as f64 / 1.0e6) / secs
} else {
0.0
},
);
}
let index = serde_json::json!({
"schemaVersion": 2,
"mediaType": "application/vnd.oci.image.index.v1+json",
"manifests": [{
"mediaType": "application/vnd.oci.image.manifest.v1+json",
"digest": format!("sha256:{}", rm.manifest_digest),
"platform": {"architecture": arch, "os": "linux"}
}]
});
std::fs::write(
layout.join("index.json"),
serde_json::to_vec_pretty(&index).map_err(|e| format!("encode OCI index: {e}"))?,
)
.map_err(|e| format!("write OCI index: {e}"))?;
if trace_enabled() {
eprintln!(
"supermachine: registry blobs cache_hits={} downloads={} cache={}",
blob_cache_hits,
blob_downloads,
registry_blob_cache_dir().display()
);
}
Ok(())
}
pub(super) fn write_oci_blob_bytes(layout: &Path, sha: &str, bytes: &[u8]) -> Result<(), String> {
let path = layout.join("blobs/sha256").join(sha);
if path.is_file() {
return Ok(());
}
std::fs::write(&path, bytes).map_err(|e| format!("write OCI blob {}: {e}", path.display()))
}
#[cfg(test)]
mod registry_ref_tests {
use super::*;
fn parse(s: &str) -> RegistryImageRef {
parse_registry_image_ref(s).expect("parse")
}
#[test]
fn bare_name_defaults_to_dockerhub_library_latest() {
let r = parse("ubuntu");
assert_eq!(r.registry, "registry-1.docker.io");
assert_eq!(r.repository, "library/ubuntu");
assert_eq!(r.reference, "latest");
}
#[test]
fn tag_is_parsed() {
assert_eq!(parse("ubuntu:22.04").reference, "22.04");
assert_eq!(parse("ubuntu:22.04").repository, "library/ubuntu");
}
#[test]
fn namespaced_dockerhub_keeps_namespace_no_library_prefix() {
let r = parse("myorg/myimg:v3");
assert_eq!(r.registry, "registry-1.docker.io");
assert_eq!(r.repository, "myorg/myimg");
assert_eq!(r.reference, "v3");
}
#[test]
fn explicit_registry_with_dot_is_detected() {
let r = parse("ghcr.io/owner/img:tag");
assert_eq!(r.registry, "ghcr.io");
assert_eq!(r.repository, "owner/img");
assert_eq!(r.reference, "tag");
}
#[test]
fn localhost_registry_with_port() {
let r = parse("localhost:5000/img");
assert_eq!(r.registry, "localhost:5000");
assert_eq!(r.repository, "img");
assert_eq!(r.reference, "latest");
}
#[test]
fn deep_repository_path() {
let r = parse("registry.example.com/a/b/c:v1");
assert_eq!(r.registry, "registry.example.com");
assert_eq!(r.repository, "a/b/c");
assert_eq!(r.reference, "v1");
}
#[test]
fn digest_reference() {
let digest = "sha256:0000000000000000000000000000000000000000000000000000000000000000";
let r = parse(&format!("ubuntu@{digest}"));
assert_eq!(r.repository, "library/ubuntu");
assert_eq!(r.reference, digest);
let r2 = parse(&format!("gcr.io/proj/img@{digest}"));
assert_eq!(r2.registry, "gcr.io");
assert_eq!(r2.repository, "proj/img");
assert_eq!(r2.reference, digest);
}
#[test]
fn colon_before_slash_is_a_port_not_a_tag() {
let r = parse("localhost:5000/team/app:1.2");
assert_eq!(r.registry, "localhost:5000");
assert_eq!(r.repository, "team/app");
assert_eq!(r.reference, "1.2");
}
#[test]
fn empty_repository_is_rejected() {
assert!(parse_registry_image_ref("registry.io/").is_err());
}
}
#[cfg(test)]
mod token_cache_tests {
use super::*;
use std::time::Duration;
#[test]
fn fresh_token_is_returned_within_ttl() {
let reg = "tc-fresh.example";
let repo = "lib/app";
assert_eq!(cached_registry_token(reg, repo), None);
store_registry_token(reg, repo, "tok-123", Duration::from_secs(60));
assert_eq!(cached_registry_token(reg, repo).as_deref(), Some("tok-123"));
}
#[test]
fn expired_token_is_not_returned() {
let reg = "tc-expired.example";
let repo = "lib/app";
store_registry_token(reg, repo, "stale", Duration::from_millis(0));
std::thread::sleep(Duration::from_millis(5));
assert_eq!(cached_registry_token(reg, repo), None);
}
#[test]
fn invalidate_drops_the_token() {
let reg = "tc-invalidate.example";
let repo = "lib/app";
store_registry_token(reg, repo, "tok", Duration::from_secs(60));
assert!(cached_registry_token(reg, repo).is_some());
invalidate_registry_token(reg, repo);
assert_eq!(cached_registry_token(reg, repo), None);
}
#[test]
fn keys_are_per_registry_and_repo() {
store_registry_token("tc-keys.a", "r1", "ta", Duration::from_secs(60));
store_registry_token("tc-keys.a", "r2", "tb", Duration::from_secs(60));
store_registry_token("tc-keys.b", "r1", "tc", Duration::from_secs(60));
assert_eq!(
cached_registry_token("tc-keys.a", "r1").as_deref(),
Some("ta")
);
assert_eq!(
cached_registry_token("tc-keys.a", "r2").as_deref(),
Some("tb")
);
assert_eq!(
cached_registry_token("tc-keys.b", "r1").as_deref(),
Some("tc")
);
assert_eq!(cached_registry_token("tc-keys.a", "r3"), None);
}
#[test]
fn store_overwrites_previous_token() {
let reg = "tc-overwrite.example";
let repo = "lib/app";
store_registry_token(reg, repo, "old", Duration::from_secs(60));
store_registry_token(reg, repo, "new", Duration::from_secs(60));
assert_eq!(cached_registry_token(reg, repo).as_deref(), Some("new"));
}
}
#[cfg(test)]
mod header_parse_tests {
use super::*;
#[test]
fn bearer_challenge_dockerhub_form() {
let h = "HTTP/1.1 401 Unauthorized\r\n\
Www-Authenticate: Bearer realm=\"https://auth.docker.io/token\",\
service=\"registry.docker.io\",scope=\"repository:library/nginx:pull\"\r\n\r\n";
let (realm, service, scope) = parse_bearer_challenge(h).expect("challenge");
assert_eq!(realm, "https://auth.docker.io/token");
assert_eq!(service, "registry.docker.io");
assert_eq!(scope, "repository:library/nginx:pull");
}
#[test]
fn bearer_challenge_header_name_is_case_insensitive() {
let h = "www-authenticate: Bearer realm=\"https://r/token\",service=\"svc\"\r\n";
let (realm, service, scope) = parse_bearer_challenge(h).expect("challenge");
assert_eq!(realm, "https://r/token");
assert_eq!(service, "svc");
assert_eq!(scope, "");
}
#[test]
fn bearer_challenge_requires_realm() {
let h = "Www-Authenticate: Bearer service=\"svc\",scope=\"s\"\r\n";
assert!(parse_bearer_challenge(h).is_none());
}
#[test]
fn bearer_challenge_rejects_non_bearer_and_absent() {
assert!(parse_bearer_challenge("Www-Authenticate: Basic realm=\"x\"\r\n").is_none());
assert!(parse_bearer_challenge("Content-Length: 0\r\n").is_none());
}
#[test]
fn retry_after_parses_integer_seconds_only() {
assert_eq!(
parse_retry_after_header("HTTP/1.1 429\r\nRetry-After: 120\r\n\r\n"),
Some(120)
);
assert_eq!(parse_retry_after_header("retry-after: 5 \r\n"), Some(5));
assert_eq!(
parse_retry_after_header("Retry-After: Wed, 21 Oct 2025 07:28:00 GMT\r\n"),
None
);
assert_eq!(parse_retry_after_header("Content-Length: 3\r\n"), None);
}
}