use std::io::Write;
use std::path::Path;
use anyhow::{Context, Result};
use bv_core::lockfile::{CondaPackagePin, LayerDescriptor};
use futures_util::StreamExt as _;
use oci_client::{
Reference,
client::{Client, ClientConfig, ClientProtocol},
secrets::RegistryAuth,
};
use sha2::{Digest, Sha256};
use crate::catalog::LayerCatalog;
use crate::layering::{LayerGroup, PackingStrategy, pack};
use crate::popularity::PopularityMap;
use crate::spec::{ResolvedPackage, ResolvedSpec};
const SOURCE_DATE_EPOCH: u64 = 0;
pub struct OciImage {
pub name: String,
pub version: String,
pub layers: Vec<OciLayer>,
pub config: Vec<u8>,
}
pub struct OciLayer {
pub compressed: Vec<u8>,
pub descriptor: LayerDescriptor,
pub uncompressed_digest: String,
}
impl OciImage {
pub fn manifest_json(&self) -> Result<Vec<u8>> {
let config_digest = sha256_hex(&self.config);
let config_size = self.config.len() as u64;
let mut layers_json = String::from("[\n");
for (i, layer) in self.layers.iter().enumerate() {
let comma = if i + 1 == self.layers.len() { "" } else { "," };
layers_json.push_str(&format!(
" {{\"mediaType\":\"{}\",\"digest\":\"{}\",\"size\":{}}}{}\n",
layer.descriptor.media_type, layer.descriptor.digest, layer.descriptor.size, comma,
));
}
layers_json.push(']');
let manifest = format!(
r#"{{
"schemaVersion": 2,
"mediaType": "application/vnd.oci.image.manifest.v1+json",
"config": {{
"mediaType": "application/vnd.oci.image.config.v1+json",
"digest": "sha256:{config_digest}",
"size": {config_size}
}},
"layers": {layers_json}
}}"#
);
Ok(manifest.into_bytes())
}
}
pub async fn build(
resolved: &ResolvedSpec,
strategy: &PackingStrategy,
popularity: Option<&PopularityMap>,
catalog: Option<&LayerCatalog>,
) -> Result<OciImage> {
let groups = pack(&resolved.packages, strategy, popularity, catalog);
let http = reqwest::Client::builder()
.user_agent("bv-builder/0.1")
.timeout(std::time::Duration::from_secs(600))
.build()?;
let base_ref = resolved
.base
.as_deref()
.unwrap_or("docker.io/library/debian:12-slim");
let mut layers = fetch_base_layers(base_ref)
.await
.with_context(|| format!("fetch base image '{base_ref}'"))?;
let concurrency = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1)
.min(8);
let mut pkg_layers: Vec<OciLayer> = futures_util::stream::iter(groups.iter())
.map(|g| build_group_layer(&http, g))
.buffered(concurrency)
.collect::<Vec<_>>()
.await
.into_iter()
.collect::<Result<Vec<Option<OciLayer>>>>()?
.into_iter()
.flatten()
.collect();
layers.append(&mut pkg_layers);
let meta_layer = build_meta_layer(resolved)?;
layers.push(meta_layer);
let entrypoint_layer = build_entrypoint_layer(resolved)?;
layers.push(entrypoint_layer);
let config = build_config(resolved, &layers)?;
Ok(OciImage {
name: resolved.name.clone(),
version: resolved.version.clone(),
layers,
config,
})
}
async fn fetch_base_layers(base_ref: &str) -> Result<Vec<OciLayer>> {
use futures_util::StreamExt;
let reference: Reference = base_ref
.parse()
.with_context(|| format!("parse base OCI reference '{base_ref}'"))?;
let oci_config = ClientConfig {
protocol: ClientProtocol::HttpsExcept(vec!["localhost".into(), "127.0.0.1".into()]),
..Default::default()
};
let client = Client::new(oci_config);
let auth = if base_ref.contains("ghcr.io") {
if let Ok(token) = std::env::var("GITHUB_TOKEN") {
RegistryAuth::Basic("token".into(), token)
} else {
RegistryAuth::Anonymous
}
} else {
RegistryAuth::Anonymous
};
let (manifest, _digest, config_json) = client
.pull_manifest_and_config(&reference, &auth)
.await
.with_context(|| format!("pull manifest+config for '{base_ref}'"))?;
let base_config: serde_json::Value =
serde_json::from_str(&config_json).context("parse base image config")?;
let base_diff_ids = base_config["rootfs"]["diff_ids"]
.as_array()
.cloned()
.unwrap_or_default();
let mut result = Vec::new();
for (i, layer_desc) in manifest.layers.iter().enumerate() {
let digest = &layer_desc.digest;
let media_type = &layer_desc.media_type;
let size = layer_desc.size as u64;
let mut compressed = Vec::new();
let mut stream = client
.pull_blob_stream(&reference, layer_desc)
.await
.with_context(|| format!("pull base layer blob {digest}"))?;
while let Some(chunk) = stream.next().await {
compressed.extend_from_slice(&chunk?);
}
let uncompressed_digest = base_diff_ids
.get(i)
.and_then(|v| v.as_str())
.unwrap_or(digest)
.to_string();
result.push(OciLayer {
compressed,
uncompressed_digest,
descriptor: LayerDescriptor {
digest: digest.clone(),
size,
media_type: media_type.clone(),
conda_package: None,
},
});
}
Ok(result)
}
async fn build_group_layer(
client: &reqwest::Client,
group: &LayerGroup,
) -> Result<Option<OciLayer>> {
let downloaded: Vec<(crate::spec::ResolvedPackage, Vec<u8>)> =
futures_util::future::try_join_all(
group
.packages
.iter()
.map(|pkg| download_package(client, pkg)),
)
.await?;
let conda_package = if group.packages.len() == 1 {
let pkg = &group.packages[0];
Some(CondaPackagePin {
name: pkg.name.clone(),
version: pkg.version.clone(),
build: pkg.build.clone(),
channel: pkg.channel.clone(),
sha256: pkg.sha256.clone(),
})
} else {
None
};
tokio::task::spawn_blocking(move || -> Result<Option<OciLayer>> {
let work_dir = tempfile::tempdir().context("create temp dir for layer build")?;
let prefix = work_dir.path().join("opt").join("conda");
std::fs::create_dir_all(&prefix).context("create conda prefix dir")?;
for (pkg, bytes) in &downloaded {
extract_package_bytes(pkg, bytes, &prefix)
.with_context(|| format!("extract {}", pkg.filename))?;
}
if !prefix_has_files(&prefix) {
return Ok(None);
}
let (compressed, uncompressed_digest) = create_reproducible_layer(work_dir.path())?;
let digest = format!("sha256:{}", sha256_hex(&compressed));
let size = compressed.len() as u64;
Ok(Some(OciLayer {
compressed,
uncompressed_digest: format!("sha256:{uncompressed_digest}"),
descriptor: LayerDescriptor {
digest,
size,
media_type: "application/vnd.oci.image.layer.v1.tar+zstd".into(),
conda_package,
},
}))
})
.await
.context("layer build task panicked")?
}
fn prefix_has_files(dir: &Path) -> bool {
let Ok(entries) = std::fs::read_dir(dir) else {
return false;
};
for entry in entries.flatten() {
let Ok(meta) = entry.metadata() else { continue };
if meta.is_file() {
return true;
}
if meta.is_dir() && prefix_has_files(&entry.path()) {
return true;
}
}
false
}
async fn download_package(
client: &reqwest::Client,
pkg: &crate::spec::ResolvedPackage,
) -> Result<(crate::spec::ResolvedPackage, Vec<u8>)> {
use futures_util::StreamExt;
let resp = client
.get(&pkg.url)
.send()
.await
.with_context(|| format!("download {}", pkg.url))?;
if !resp.status().is_success() {
anyhow::bail!("HTTP {} fetching {}", resp.status(), pkg.url);
}
let mut bytes = Vec::new();
let mut stream = resp.bytes_stream();
while let Some(chunk) = stream.next().await {
bytes.extend_from_slice(&chunk?);
}
if !pkg.sha256.is_empty() {
let actual = sha256_hex(&bytes);
if actual != pkg.sha256 {
anyhow::bail!(
"sha256 mismatch for {} ({}): expected {} got {}",
pkg.name,
pkg.filename,
pkg.sha256,
actual
);
}
}
Ok((pkg.clone(), bytes))
}
fn extract_package_bytes(
pkg: &crate::spec::ResolvedPackage,
bytes: &[u8],
dest: &Path,
) -> Result<()> {
if pkg.filename.ends_with(".conda") {
extract_conda_archive(bytes, dest)
} else if pkg.filename.ends_with(".tar.bz2") {
extract_tar_bz2(bytes, dest)
} else {
Ok(())
}
}
fn extract_conda_archive(data: &[u8], dest: &Path) -> Result<()> {
use std::io::Read;
let cursor = std::io::Cursor::new(data);
let mut zip = zip::ZipArchive::new(cursor).context("open .conda zip")?;
for i in 0..zip.len() {
let mut entry = zip.by_index(i)?;
if entry.name().starts_with("pkg-") && entry.name().ends_with(".tar.zst") {
let mut zstd_bytes = Vec::new();
entry.read_to_end(&mut zstd_bytes)?;
let decompressed = zstd::decode_all(std::io::Cursor::new(zstd_bytes))
.context("decompress pkg- zstd")?;
extract_tar_bytes(&decompressed, dest)?;
}
}
Ok(())
}
fn extract_tar_bz2(data: &[u8], dest: &Path) -> Result<()> {
let decompressed = bzip2::read::BzDecoder::new(data);
let mut archive = tar::Archive::new(decompressed);
unpack_tar_into(&mut archive, dest)
}
fn extract_tar_bytes(data: &[u8], dest: &Path) -> Result<()> {
let mut archive = tar::Archive::new(std::io::Cursor::new(data));
unpack_tar_into(&mut archive, dest)
}
fn unpack_tar_into<R: std::io::Read>(archive: &mut tar::Archive<R>, dest: &Path) -> Result<()> {
for entry in archive.entries().context("read tar entries")? {
let mut entry = entry.context("read tar entry")?;
if entry.header().entry_type() == tar::EntryType::Symlink {
let entry_path = entry.path().context("read entry path")?;
let link_name = entry
.link_name()
.context("read symlink target")?
.context("missing symlink target")?;
let rel: std::path::PathBuf = entry_path
.components()
.filter(|c| matches!(c, std::path::Component::Normal(_)))
.collect();
let full_path = dest.join(&rel);
if let Some(parent) = full_path.parent() {
std::fs::create_dir_all(parent).ok();
}
let _ = std::fs::remove_file(&full_path);
#[cfg(unix)]
std::os::unix::fs::symlink(&*link_name, &full_path)
.with_context(|| format!("symlink {:?} -> {:?}", full_path, link_name))?;
continue;
}
if let Err(e) = entry.unpack_in(dest) {
if e.kind() == std::io::ErrorKind::NotADirectory {
continue;
}
return Err(e).context("unpack tar entry");
}
}
Ok(())
}
fn create_reproducible_layer(dir: &Path) -> Result<(Vec<u8>, String)> {
use std::fs;
let mut entries: Vec<std::path::PathBuf> = Vec::new();
collect_files(dir, &mut entries)?;
entries.sort();
let mut uncompressed: Vec<u8> = Vec::new();
{
let mut builder = tar::Builder::new(&mut uncompressed);
builder.follow_symlinks(false);
for entry_path in &entries {
let rel = entry_path.strip_prefix(dir).unwrap();
let meta = fs::symlink_metadata(entry_path)?;
let mut header = tar::Header::new_ustar();
header.set_metadata(&meta);
header.set_mtime(SOURCE_DATE_EPOCH);
header.set_uid(0);
header.set_gid(0);
header.set_username("")?;
header.set_groupname("")?;
if meta.file_type().is_symlink() {
let target = fs::read_link(entry_path)?;
header.set_size(0);
header.set_entry_type(tar::EntryType::Symlink);
header.set_path(rel)?;
header.set_link_name(&target)?;
header.set_cksum();
builder.append(&header, std::io::empty())?;
} else if meta.is_file() {
let data = fs::read(entry_path)?;
header.set_size(data.len() as u64);
header.set_cksum();
builder.append_data(&mut header, rel, data.as_slice())?;
} else if meta.is_dir() {
header.set_size(0);
header.set_cksum();
builder.append_data(&mut header, rel, std::io::empty())?;
}
}
builder.finish()?;
}
let uncompressed_digest = sha256_hex(&uncompressed);
let compressed =
zstd::encode_all(std::io::Cursor::new(&uncompressed), 19).context("zstd compress layer")?;
Ok((compressed, uncompressed_digest))
}
fn collect_files(dir: &Path, out: &mut Vec<std::path::PathBuf>) -> Result<()> {
for entry in std::fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();
let meta = std::fs::symlink_metadata(&path)?;
if meta.file_type().is_symlink() {
out.push(path);
} else if meta.is_dir() {
out.push(path.clone());
collect_files(&path, out)?;
} else {
out.push(path);
}
}
Ok(())
}
fn build_meta_layer(resolved: &ResolvedSpec) -> Result<OciLayer> {
let work_dir = tempfile::tempdir().context("create temp dir for meta layer")?;
let conda_meta = work_dir.path().join("opt").join("conda").join("conda-meta");
std::fs::create_dir_all(&conda_meta)?;
for pkg in &resolved.packages {
let meta = serde_json::json!({
"name": pkg.name,
"version": pkg.version,
"build": pkg.build,
"channel": pkg.channel,
"url": pkg.url,
"sha256": pkg.sha256,
});
let filename = format!("{}-{}-{}.json", pkg.name, pkg.version, pkg.build);
let path = conda_meta.join(filename);
std::fs::write(&path, serde_json::to_string_pretty(&meta)?)?;
}
let (compressed, uncompressed_digest) = create_reproducible_layer(work_dir.path())?;
let digest = format!("sha256:{}", sha256_hex(&compressed));
let size = compressed.len() as u64;
Ok(OciLayer {
compressed,
uncompressed_digest: format!("sha256:{uncompressed_digest}"),
descriptor: LayerDescriptor {
digest,
size,
media_type: "application/vnd.oci.image.layer.v1.tar+zstd".into(),
conda_package: None,
},
})
}
fn build_entrypoint_layer(_resolved: &ResolvedSpec) -> Result<OciLayer> {
let work_dir = tempfile::tempdir().context("create temp dir for entrypoint layer")?;
let script_path = work_dir.path().join("bv-entrypoint.sh");
{
let mut f = std::fs::File::create(&script_path)?;
writeln!(f, "#!/bin/sh")?;
writeln!(f, "# Generated by bv-builder; do not edit")?;
writeln!(f, "exec \"$@\"")?;
}
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let mut perms = std::fs::metadata(&script_path)?.permissions();
perms.set_mode(0o755);
std::fs::set_permissions(&script_path, perms)?;
}
let (compressed, uncompressed_digest) = create_reproducible_layer(work_dir.path())?;
let digest = format!("sha256:{}", sha256_hex(&compressed));
let size = compressed.len() as u64;
Ok(OciLayer {
compressed,
uncompressed_digest: format!("sha256:{uncompressed_digest}"),
descriptor: LayerDescriptor {
digest,
size,
media_type: "application/vnd.oci.image.layer.v1.tar+zstd".into(),
conda_package: None,
},
})
}
fn build_config(resolved: &ResolvedSpec, layers: &[OciLayer]) -> Result<Vec<u8>> {
let diff_ids: Vec<String> = layers
.iter()
.map(|l| l.uncompressed_digest.clone())
.collect();
let config = serde_json::json!({
"architecture": resolved.platform.to_string().split('/').nth(1).unwrap_or("amd64"),
"os": "linux",
"created": "1970-01-01T00:00:00Z",
"author": "bv-builder",
"config": {
"Env": [
"PATH=/opt/conda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin",
"LD_LIBRARY_PATH=/opt/conda/lib",
],
"Labels": {
"org.opencontainers.image.title": &resolved.name,
"org.opencontainers.image.version": &resolved.version,
}
},
"rootfs": {
"type": "layers",
"diff_ids": diff_ids,
},
"history": []
});
Ok(serde_json::to_vec_pretty(&config)?)
}
pub fn catalog_updates_from_image(image: &OciImage) -> Vec<(&str, &str, &str, &str)> {
image
.layers
.iter()
.filter_map(|layer| {
let pin = layer.descriptor.conda_package.as_ref()?;
Some((
pin.name.as_str(),
pin.version.as_str(),
pin.build.as_str(),
layer.descriptor.digest.as_str(),
))
})
.collect()
}
pub fn catalog_coverage(packages: &[ResolvedPackage], catalog: &LayerCatalog) -> (usize, usize) {
let hits = packages
.iter()
.filter(|p| catalog.contains(&p.name, &p.version, &p.build))
.count();
(hits, packages.len() - hits)
}
pub fn sha256_hex(data: &[u8]) -> String {
let mut hasher = Sha256::new();
hasher.update(data);
hex::encode(hasher.finalize())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn sha256_hex_is_correct() {
let hash = sha256_hex(b"hello");
assert_eq!(
hash,
"2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824"
);
}
#[test]
fn create_reproducible_layer_is_deterministic() {
let dir = tempfile::tempdir().unwrap();
std::fs::write(dir.path().join("file.txt"), b"content").unwrap();
let (c1, d1) = create_reproducible_layer(dir.path()).unwrap();
let (c2, d2) = create_reproducible_layer(dir.path()).unwrap();
assert_eq!(c1, c2, "compressed bytes differ between two runs");
assert_eq!(d1, d2, "digests differ between two runs");
}
}