use crate::blobcache;
use crate::error::{Error, Result};
use crate::layer::Layer;
use async_trait::async_trait;
use oci_client::{
client::{
Client, ClientConfig, ClientProtocol, Config as OciClientConfig, ImageData as OciImageData,
ImageLayer as OciImageLayer,
},
errors::OciDistributionError,
manifest::{ImageIndexEntry, OciImageManifest},
secrets::RegistryAuth,
Reference, RegistryOperation,
};
use crate::IMAGE_LAYER_ZSTD_MEDIA_TYPE;
use oci_spec::image::Arch;
use oci_spec::image::{ImageConfiguration, ImageManifest as SpecImageManifest};
use ocipkg::image::Image as _;
use ocipkg::image::OciArtifact;
use sha2::{Digest, Sha256};
use std::collections::HashSet;
use std::env;
use std::fs as std_fs;
use std::path::{Path, PathBuf};
use tempfile::TempDir;
use tracing::{debug, info, instrument, warn};
#[derive(Debug, Clone)]
pub struct ImageMetadata {
pub entrypoint: Option<Vec<String>>,
pub cmd: Option<Vec<String>>,
pub working_dir: Option<String>,
}
impl ImageMetadata {
pub fn runtime_command(&self) -> Vec<String> {
let mut command = Vec::new();
if let Some(ref entrypoint) = self.entrypoint {
command.extend(entrypoint.iter().cloned());
}
if let Some(ref cmd) = self.cmd {
command.extend(cmd.iter().cloned());
}
command
}
}
#[derive(Debug, Clone)]
pub struct PushProgressInfo {
pub operation: String,
pub layers_uploaded: usize,
pub total_layers: usize,
pub bytes_uploaded: u64,
pub total_bytes: u64,
}
#[async_trait]
pub trait PushProgressCallback: Send + Sync {
async fn on_progress(&self, progress: PushProgressInfo);
}
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub enum MonolithicPushPolicy {
#[default]
Auto,
Always,
Never,
}
pub struct PushOptions {
pub auth: RegistryAuth,
pub protocol: ClientProtocol,
pub monolithic_push: MonolithicPushPolicy,
pub progress_callback: Option<Box<dyn PushProgressCallback>>,
}
impl std::fmt::Debug for PushOptions {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PushOptions")
.field("auth", &self.auth)
.field("protocol", &self.protocol)
.field("monolithic_push", &self.monolithic_push)
.field("progress_callback", &self.progress_callback.is_some())
.finish()
}
}
impl Clone for PushOptions {
fn clone(&self) -> Self {
Self {
auth: self.auth.clone(),
protocol: self.protocol.clone(),
monolithic_push: self.monolithic_push.clone(),
progress_callback: None, }
}
}
impl Default for PushOptions {
fn default() -> Self {
Self {
auth: RegistryAuth::Anonymous,
protocol: ClientProtocol::Https,
monolithic_push: MonolithicPushPolicy::Auto,
progress_callback: None,
}
}
}
impl PushOptions {
pub fn with_monolithic_push(mut self) -> Self {
self.monolithic_push = MonolithicPushPolicy::Always;
self
}
pub fn with_chunked_upload(mut self) -> Self {
self.monolithic_push = MonolithicPushPolicy::Never;
self
}
pub fn with_auto_push_mode(mut self) -> Self {
self.monolithic_push = MonolithicPushPolicy::Auto;
self
}
pub fn with_progress_callback(mut self, callback: Box<dyn PushProgressCallback>) -> Self {
self.progress_callback = Some(callback);
self
}
}
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub enum PullPolicy {
Always,
#[default]
Missing,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ManifestSource {
FromCache,
FromRegistry,
NotApplicable,
}
#[derive(Debug, Clone)]
pub struct BuildDiagnostics {
pub manifest_source: ManifestSource,
pub resolved_manifest_digest: String,
}
#[derive(Debug, Clone, Default)]
pub struct PullAndExtractOptions {
pub platform_os: Option<String>,
pub platform_arch: Option<Arch>,
pub pull_policy: PullPolicy,
pub blob_cache: Option<blobcache::BlobCache>,
pub auth: Option<RegistryAuth>,
pub protocol: ClientProtocol,
}
#[derive(Debug)]
pub struct Image {
oci_archive_path: PathBuf,
config_digest: String,
_temp_dir_manager: Option<TempDir>,
}
impl Image {
pub fn builder() -> ImageBuilder {
ImageBuilder {
base_image_ref: None,
platform_os: None,
platform_arch: None,
layers: Vec::new(),
entrypoint: None,
cmd: None,
working_dir: None,
output_path: None,
blob_cache: None,
output_image_name_and_tag: None,
pull_policy: None,
auth: None,
protocol: ClientProtocol::Https,
}
}
#[instrument(fields(tarball_path = %tarball_path.as_ref().display()))]
pub fn from_tarball(tarball_path: impl AsRef<Path>) -> Result<Self> {
let tarball_path = tarball_path.as_ref();
if !tarball_path.exists() {
return Err(Error::Generic {
message: format!("OCI tarball not found at {}", tarball_path.display()),
source: None,
});
}
info!("Loading OCI image from tarball: {}", tarball_path.display());
let mut archive =
OciArtifact::from_oci_archive(tarball_path).map_err(|e| Error::OciArchive {
message: format!(
"Failed to load OCI artifact from {}",
tarball_path.display()
),
source: Some(e.into()),
})?;
let (config_desc, _config_bytes) = archive.get_config().map_err(|e| Error::OciArchive {
message: "Failed to get config from OCI artifact".to_string(),
source: Some(e.into()),
})?;
Ok(Self {
oci_archive_path: tarball_path.to_path_buf(),
config_digest: config_desc.digest().to_string(),
_temp_dir_manager: None, })
}
#[instrument(skip(options), fields(image_ref = %image_ref, target_dir = %target_dir.as_ref().display()))]
pub async fn pull_and_extract(
image_ref: &str,
target_dir: impl AsRef<Path>,
options: PullAndExtractOptions,
) -> Result<(PathBuf, ImageMetadata)> {
info!("Pulling and extracting image.");
let mut builder = Image::builder()
.from(image_ref)
.pull_policy(options.pull_policy)
.protocol(options.protocol);
match (options.platform_os, options.platform_arch) {
(Some(os), Some(arch)) => {
builder = builder.platform(&os, &arch);
}
(Some(_), None) => {
warn!("platform_os set without platform_arch; platform filter will not be applied");
}
(None, Some(_)) => {
warn!("platform_arch set without platform_os; platform filter will not be applied");
}
(None, None) => {}
}
if let Some(cache) = options.blob_cache {
builder = builder.blob_cache(cache);
}
if let Some(auth) = options.auth {
builder = builder.auth(auth);
}
let (image, _diagnostics) = builder.build().await?;
image.extract(target_dir).await
}
pub fn path(&self) -> &Path {
&self.oci_archive_path
}
pub fn config_digest(&self) -> &str {
&self.config_digest
}
pub fn get_metadata(&self) -> Result<ImageMetadata> {
Self::read_metadata_from_oci_archive(self.path())
}
fn read_metadata_from_oci_archive(path: &Path) -> Result<ImageMetadata> {
let mut archive = OciArtifact::from_oci_archive(path).map_err(|e| Error::OciArchive {
message: format!("Failed to load OCI artifact from {}", path.display()),
source: Some(e.into()),
})?;
let (_config_desc, config_bytes) = archive.get_config().map_err(|e| Error::OciArchive {
message: "Failed to get config from OCI artifact".to_string(),
source: Some(e.into()),
})?;
let config: ImageConfiguration =
serde_json::from_slice(&config_bytes).map_err(|e| Error::ImageConfig {
message: "Failed to parse image configuration".to_string(),
source: Some(e.into()),
})?;
let entrypoint;
let cmd;
let working_dir;
if let Some(process_config) = config.config() {
entrypoint = process_config.entrypoint().clone();
cmd = process_config.cmd().clone();
working_dir = process_config.working_dir().clone();
} else {
entrypoint = None;
cmd = None;
working_dir = None;
}
Ok(ImageMetadata {
entrypoint,
cmd,
working_dir,
})
}
#[instrument(skip(self), fields(image_path = %self.oci_archive_path.display(), target_dir = %target_dir.as_ref().display()))]
pub async fn extract(&self, target_dir: impl AsRef<Path>) -> Result<(PathBuf, ImageMetadata)> {
let target_dir = target_dir.as_ref();
info!("Starting image extraction.");
std_fs::create_dir_all(target_dir).map_err(|e| {
warn!(error = %e, "Failed to create target directory.");
Error::Io {
message: format!("Failed to create target directory {}", target_dir.display()),
source: e,
}
})?;
info!(path = %self.path().display(), "Loading OCI artifact for extraction.");
let mut archive = OciArtifact::from_oci_archive(self.path()).map_err(|e| {
warn!(path = %self.path().display(), error = %e, "Failed to load OCI artifact.");
Error::OciArchive {
message: format!("Failed to load OCI artifact from {}", self.path().display()),
source: Some(e.into()),
}
})?;
let (_config_desc, config_bytes) = archive.get_config().map_err(|e| {
warn!(error = %e, "Failed to get config from OCI artifact.");
Error::OciArchive {
message: "Failed to get config from OCI artifact".to_string(),
source: Some(e.into()),
}
})?;
let config: ImageConfiguration =
serde_json::from_slice(&config_bytes).map_err(|e| Error::ImageConfig {
message: "Failed to parse image configuration".to_string(),
source: Some(e.into()),
})?;
let metadata = if let Some(process_config) = config.config() {
ImageMetadata {
entrypoint: process_config.entrypoint().clone(),
cmd: process_config.cmd().clone(),
working_dir: process_config.working_dir().clone(),
}
} else {
ImageMetadata {
entrypoint: None,
cmd: None,
working_dir: None,
}
};
let layers = archive.get_layers().map_err(|e| {
warn!(error = %e, "Failed to get layers from OCI artifact.");
Error::OciArchive {
message: "Failed to get layers from OCI artifact".to_string(),
source: Some(e.into()),
}
})?;
info!(
num_layers = layers.len(),
"Extracting layers to target directory."
);
for (idx, (desc, layer_data)) in layers.iter().enumerate() {
debug!(
layer_idx = idx,
layer_digest = %desc.digest(),
layer_size = layer_data.len(),
"Extracting layer"
);
let layer_data_vec = layer_data.to_vec();
let target_dir_clone = target_dir.to_path_buf();
let layer_digest = desc.digest().to_string();
let media_type = desc.media_type().to_string();
tokio::task::spawn_blocking(move || -> Result<()> {
use std::io::Cursor;
use tar::Archive;
let cursor = Cursor::new(layer_data_vec);
let extract_err = |e: std::io::Error| Error::Io {
message: format!("Failed to extract layer {}", layer_digest),
source: e,
};
if media_type.contains("+zstd") {
let decoder = zstd::Decoder::new(cursor).map_err(|e| Error::Io {
message: format!(
"Failed to create zstd decoder for layer {}",
layer_digest
),
source: e,
})?;
extract_layer_with_whiteouts(Archive::new(decoder), &target_dir_clone)
.map_err(extract_err)?;
} else if media_type.contains("+gzip") || media_type.contains("gzip") {
let decoder = flate2::read::GzDecoder::new(cursor);
extract_layer_with_whiteouts(Archive::new(decoder), &target_dir_clone)
.map_err(extract_err)?;
} else {
extract_layer_with_whiteouts(Archive::new(cursor), &target_dir_clone)
.map_err(extract_err)?;
}
debug!(layer_digest = %layer_digest, "Layer extracted successfully");
Ok(())
})
.await
.map_err(|e| {
warn!(error = %e, "Task join error during layer extraction.");
Error::Generic {
message: "Task join error during layer extraction".to_string(),
source: Some(Box::new(e)),
}
})??;
}
info!(
target_dir = %target_dir.display(),
num_layers = layers.len(),
"Image extraction completed successfully"
);
Ok((target_dir.to_path_buf(), metadata))
}
#[instrument(skip(self, options), fields(image_path = %self.oci_archive_path.display(), target_image_ref = %target_image_ref_str, protocol = ?options.protocol))]
pub async fn push(&self, target_image_ref_str: &str, options: &PushOptions) -> Result<String> {
info!("Starting image push.");
let report_progress = |progress: PushProgressInfo| async {
if let Some(ref callback) = options.progress_callback {
callback.on_progress(progress).await;
}
};
report_progress(PushProgressInfo {
operation: "Starting push".to_string(),
layers_uploaded: 0,
total_layers: 0,
bytes_uploaded: 0,
total_bytes: 0,
})
.await;
let push_ref = Reference::try_from(target_image_ref_str).map_err(|e| {
warn!(error = %e, "Invalid target image reference format.");
Error::Generic {
message: format!(
"Invalid target image reference format '{}': {}",
target_image_ref_str, e
),
source: Some(Box::new(e)),
}
})?;
debug!(push_reference = %push_ref, "Parsed target image reference.");
let use_monolithic_push =
determine_use_monolithic_push(&options.monolithic_push, &push_ref);
let push_client_config = ClientConfig {
protocol: options.protocol.clone(),
use_monolithic_push,
..Default::default()
};
let oci_client = Client::new(push_client_config);
debug!(
use_monolithic_push = use_monolithic_push,
"OCI client for push created."
);
info!(path = %self.path().display(), "Loading OCI artifact for push.");
let mut archive = OciArtifact::from_oci_archive(self.path()).map_err(|e| {
warn!(path = %self.path().display(), error = %e, "Failed to load OCI artifact.");
Error::OciArchive {
message: format!("Failed to load OCI artifact from {}", self.path().display()),
source: Some(e.into()),
}
})?;
debug!("Converting OCI manifest for client.");
let spec_mani: SpecImageManifest = archive.get_manifest().map_err(|e| {
warn!(error = %e, "Failed to get manifest from OCI artifact.");
Error::OciArchive {
message: "Failed to get manifest from OCI artifact".to_string(),
source: Some(e.into()),
}
})?;
let mani_json_bytes = serde_json::to_vec(&spec_mani).map_err(|e| {
warn!(error = %e, "Failed to serialize spec manifest to JSON.");
Error::ImageConfig {
message: "Failed to serialize spec manifest to JSON".to_string(),
source: Some(e.into()),
}
})?;
let dist_mani: OciImageManifest =
serde_json::from_slice(&mani_json_bytes).map_err(|e| {
warn!(error = %e, "Failed to deserialize OCI client manifest from JSON.");
Error::ImageConfig {
message: "Failed to deserialize OCI client manifest from JSON".to_string(),
source: Some(e.into()),
}
})?;
debug!("OCI manifest converted.");
debug!(
num_dist_mani_layers = dist_mani.layers.len(),
dist_mani_layers_digests = ?dist_mani.layers.iter().map(|l| l.digest.as_str()).collect::<Vec<_>>(),
"Details of dist_mani (the manifest to be pushed)"
);
debug!("Preparing image config for OCI client.");
let (cfg_desc, cfg_bytes) = archive.get_config().map_err(|e| {
warn!(error = %e, "Failed to get config from OCI artifact.");
Error::OciArchive {
message: "Failed to get config from OCI artifact".to_string(),
source: Some(e.into()),
}
})?;
let cfg_for_push = OciClientConfig {
data: cfg_bytes,
media_type: cfg_desc.media_type().to_string(),
annotations: cfg_desc
.annotations()
.clone()
.map(|h| h.into_iter().collect()),
};
debug!(config_media_type = %cfg_for_push.media_type, "Image config prepared.");
info!(target_registry = %push_ref.registry(), "Authenticating with registry.");
oci_client
.auth(&push_ref, &options.auth, RegistryOperation::Push)
.await
.map_err(|e| {
warn!(registry = %push_ref.registry(), error = %e, "Authentication failed for push.");
Error::Generic {
message: format!("Authentication failed for push to {}: {}", push_ref, e),
source: Some(Box::new(e)),
}
})?;
info!("Authentication successful.");
debug!("Retrieving artifact layers for mounting check.");
let artifact_layers_result = archive.get_layers().map_err(|e| {
warn!(error = %e, "Failed to get layers from OCI artifact.");
Error::OciArchive {
message: "Failed to get layers from OCI artifact".to_string(),
source: Some(e.into()),
}
});
let artifact_layers = artifact_layers_result?;
debug!(
num_artifact_layers = artifact_layers.len(),
artifact_layers_digests = ?artifact_layers.iter().map(|(d, _)| d.digest()).collect::<Vec<_>>(),
"Details of artifact_layers (layers to be processed for mount/upload)"
);
let mut mounted_digests = HashSet::new();
info!("Attempting to mount or verify existing layers to skip upload.");
for (desc, _layer_data_from_artifact) in &artifact_layers {
let digest_str = desc.digest().to_string();
let mut should_skip_upload = false;
debug!(layer_digest = %digest_str, "Checking if layer already exists via self-mount.");
match oci_client
.mount_blob(&push_ref, &push_ref, &digest_str)
.await
{
Ok(_) => {
info!(layer_digest = %digest_str, "Layer already exists in registry (mount returned 201).");
should_skip_upload = true;
}
Err(e) => {
debug!(layer_digest = %digest_str, error = %e, "Self-mount failed, layer will be uploaded.");
}
}
if should_skip_upload {
mounted_digests.insert(digest_str.clone());
}
}
info!(
num_layers_skipped = mounted_digests.len(),
"Finished attempting to mount/verify layers."
);
let layers_to_push: Vec<OciImageLayer> = artifact_layers
.into_iter() .filter(|(d, _)| !mounted_digests.contains(d.digest()))
.map(|(d, data)| OciImageLayer {
data: data.to_vec(), media_type: d.media_type().to_string(),
annotations: d.annotations().clone().map(|h| h.into_iter().collect()),
})
.collect();
let total_push_size_bytes: usize = layers_to_push.iter().map(|l| l.data.len()).sum();
info!(
num_layers_to_push = layers_to_push.len(),
num_total_layers = mounted_digests.len() + layers_to_push.len(),
total_push_size_mb = total_push_size_bytes / (1024 * 1024),
"Preparing to push layers."
);
let total_layers = mounted_digests.len() + layers_to_push.len();
report_progress(PushProgressInfo {
operation: "Uploading layers".to_string(),
layers_uploaded: mounted_digests.len(),
total_layers,
bytes_uploaded: 0,
total_bytes: total_push_size_bytes as u64,
})
.await;
if layers_to_push.is_empty() {
info!("All layers already exist in the registry. Pushing config and manifest.");
report_progress(PushProgressInfo {
operation: "Uploading config and manifest".to_string(),
layers_uploaded: total_layers,
total_layers,
bytes_uploaded: total_push_size_bytes as u64,
total_bytes: total_push_size_bytes as u64,
})
.await;
oci_client
.push(
&push_ref,
&Vec::new(), cfg_for_push,
&options.auth,
Some(dist_mani),
)
.await
.map_err(|e| {
warn!(error = %e, "OCI client push failed.");
Error::Generic {
message: format!("OCI client push to {} failed: {}", push_ref, e),
source: Some(Box::new(e)),
}
})?;
report_progress(PushProgressInfo {
operation: "Push completed".to_string(),
layers_uploaded: total_layers,
total_layers,
bytes_uploaded: total_push_size_bytes as u64,
total_bytes: total_push_size_bytes as u64,
})
.await;
info!(image_ref = %target_image_ref_str, "Image push successful (config and manifest only).");
return Ok(target_image_ref_str.to_string());
}
info!("Pushing image (layers, config, manifest).");
let operation_text = if total_push_size_bytes > 10 * 1024 * 1024 {
format!(
"Uploading {:.1} MB in {} layers",
total_push_size_bytes as f64 / (1024.0 * 1024.0),
layers_to_push.len()
)
} else {
format!("Uploading {} layers", layers_to_push.len())
};
report_progress(PushProgressInfo {
operation: operation_text,
layers_uploaded: mounted_digests.len(),
total_layers,
bytes_uploaded: 0,
total_bytes: total_push_size_bytes as u64,
})
.await;
let mut uploaded_bytes = 0u64;
let mut uploaded_layers = mounted_digests.len();
for (i, layer) in layers_to_push.iter().enumerate() {
let digest = format!("sha256:{:x}", sha2::Sha256::digest(&layer.data));
info!(
"Uploading layer {}/{}: {}",
i + 1,
layers_to_push.len(),
digest
);
oci_client
.push_blob(&push_ref, &layer.data, &digest)
.await
.map_err(|e| {
warn!(error = %e, "Failed to push layer {}", digest);
Error::Generic {
message: format!("Failed to push layer {}: {}", digest, e),
source: Some(Box::new(e)),
}
})?;
uploaded_bytes += layer.data.len() as u64;
uploaded_layers += 1;
report_progress(PushProgressInfo {
operation: String::new(),
layers_uploaded: uploaded_layers,
total_layers,
bytes_uploaded: uploaded_bytes,
total_bytes: total_push_size_bytes as u64,
})
.await;
}
info!("Uploading config blob");
report_progress(PushProgressInfo {
operation: "Uploading config".to_string(),
layers_uploaded: uploaded_layers,
total_layers,
bytes_uploaded: uploaded_bytes,
total_bytes: total_push_size_bytes as u64,
})
.await;
oci_client
.push_blob(&push_ref, &cfg_for_push.data, &dist_mani.config.digest)
.await
.map_err(|e| {
warn!(error = %e, "Failed to push config blob");
Error::Generic {
message: format!("Failed to push config blob: {}", e),
source: Some(Box::new(e)),
}
})?;
info!("Uploading manifest");
report_progress(PushProgressInfo {
operation: "Uploading manifest".to_string(),
layers_uploaded: uploaded_layers,
total_layers,
bytes_uploaded: uploaded_bytes,
total_bytes: total_push_size_bytes as u64,
})
.await;
oci_client
.push_manifest(&push_ref, &dist_mani.into())
.await
.map_err(|e| {
warn!(error = %e, "Failed to push manifest");
Error::Generic {
message: format!("Failed to push manifest: {}", e),
source: Some(Box::new(e)),
}
})?;
report_progress(PushProgressInfo {
operation: "Push completed".to_string(),
layers_uploaded: total_layers,
total_layers,
bytes_uploaded: total_push_size_bytes as u64,
total_bytes: total_push_size_bytes as u64,
})
.await;
info!(image_ref = %target_image_ref_str, "Image push successful.");
Ok(target_image_ref_str.to_string())
}
}
#[derive(Default)]
pub struct ImageBuilder {
base_image_ref: Option<String>,
platform_os: Option<String>,
platform_arch: Option<Arch>,
layers: Vec<Layer>,
entrypoint: Option<Vec<String>>,
cmd: Option<Vec<String>>,
working_dir: Option<String>,
output_path: Option<PathBuf>,
blob_cache: Option<blobcache::BlobCache>,
output_image_name_and_tag: Option<String>,
pull_policy: Option<PullPolicy>,
auth: Option<RegistryAuth>,
protocol: ClientProtocol,
}
impl ImageBuilder {
pub fn from(mut self, base_image_ref: &str) -> Self {
self.base_image_ref = Some(base_image_ref.to_string());
self
}
pub fn platform(mut self, os: &str, arch: &Arch) -> Self {
self.platform_os = Some(os.to_string());
self.platform_arch = Some(arch.clone());
self
}
pub fn layer(mut self, layer: Layer) -> Self {
self.layers.push(layer);
self
}
pub fn entrypoint(mut self, entrypoint: Vec<String>) -> Self {
self.entrypoint = Some(entrypoint);
self
}
pub fn cmd(mut self, cmd: Vec<String>) -> Self {
self.cmd = Some(cmd);
self
}
pub fn working_dir(mut self, working_dir: &str) -> Self {
self.working_dir = Some(working_dir.to_string());
self
}
pub fn output_to(mut self, path: PathBuf) -> Self {
self.output_path = Some(path);
self
}
pub fn blob_cache(mut self, cache: blobcache::BlobCache) -> Self {
self.blob_cache = Some(cache);
self
}
pub fn output_name_and_tag(mut self, name_and_tag: &str) -> Self {
self.output_image_name_and_tag = Some(name_and_tag.to_string());
self
}
pub fn pull_policy(mut self, policy: PullPolicy) -> Self {
self.pull_policy = Some(policy);
self
}
pub fn protocol(mut self, protocol: ClientProtocol) -> Self {
self.protocol = protocol;
self
}
pub fn auth(mut self, auth: RegistryAuth) -> Self {
self.auth = Some(auth);
self
}
#[instrument(skip_all, fields(
base_image_ref = ?self.base_image_ref,
platform_os = ?self.platform_os,
platform_arch = ?self.platform_arch,
num_layers_to_add = self.layers.len(),
output_path = ?self.output_path
))]
pub async fn build(mut self) -> Result<(Image, BuildDiagnostics)> {
info!("Starting image build.");
let target_os_for_build = self
.platform_os
.clone()
.unwrap_or_else(|| "linux".to_string());
let target_arch_for_build = self.platform_arch.clone().unwrap_or(Arch::Amd64);
let base_image_data: Option<OciImageData>;
let manifest_source: ManifestSource;
let resolved_manifest_digest_str: String;
let default_image_name: String;
if let Some(base_image_ref_str) = &self.base_image_ref {
info!("Building from base image: {}", base_image_ref_str);
let base_ref =
Reference::try_from(base_image_ref_str.as_str()).map_err(|e| Error::ImagePull {
image_ref: base_image_ref_str.to_string(),
message: format!("Invalid base image reference format: {}", e),
source: Some(Box::new(e)),
})?;
let pull_auth = self
.auth
.take()
.unwrap_or_else(|| determine_registry_auth(&base_ref));
let pull_policy = self.pull_policy.take().unwrap_or_default();
let cache = match self.blob_cache.take() {
Some(c) => c,
None => {
debug!("No BlobCache provided to ImageBuilder, creating a default one.");
blobcache::BlobCache::new()?
}
};
let mut client_cfg = ClientConfig {
protocol: self.protocol.clone(),
..Default::default()
};
if let (Some(os_filter_val), Some(arch_filter_val)) =
(&self.platform_os, &self.platform_arch)
{
let os_filter_cloned = os_filter_val.clone();
let arch_filter_cloned = arch_filter_val.to_string();
client_cfg.platform_resolver = Some(Box::new(
move |index_entries: &[ImageIndexEntry]| {
info!(target_os = %os_filter_cloned, target_arch = %arch_filter_cloned, num_index_entries = index_entries.len(), "Platform resolver: Attempting to find match.");
for entry in index_entries {
if let Some(p) = entry.platform.as_ref() {
if p.os == os_filter_cloned && p.architecture == arch_filter_cloned
{
info!(resolver_selected_digest = %entry.digest, "Platform resolver: Found a match.");
return Some(entry.digest.clone());
}
}
}
warn!(target_os = %os_filter_cloned, target_arch = %arch_filter_cloned, "Platform resolver: No match found.");
None
},
));
} else {
info!("Platform resolver not configured as os/arch were not explicitly provided to ImageBuilder. Default oci-client resolver will be used if necessary.");
}
let oci_client = Client::new(client_cfg);
info!(base_image_ref = %base_image_ref_str, pull_policy = ?pull_policy, "Attempting to pull/load resolved base image manifest.");
let pull_err_mapper = |e: OciDistributionError| {
warn!(base_image_ref = %base_image_ref_str, error = %e, "Failed to pull and resolve base image manifest.");
Error::ImagePull {
image_ref: base_image_ref_str.to_string(),
message: format!(
"Failed to pull/resolve base image manifest ({}): {}",
base_image_ref_str, e
),
source: Some(Box::new(e)),
}
};
let manifest_cache_key = format!(
"manifest-v2:{}:{}:{}",
base_ref.whole(),
target_os_for_build.as_str(),
target_arch_for_build,
);
let manifest_source_temp: ManifestSource;
let (base_image_manifest_resolved, resolved_manifest_digest_str_temp) = if pull_policy
== PullPolicy::Missing
{
debug!(key = %manifest_cache_key, "PullPolicy::Missing. Attempting to load manifest from cache.");
match cache.get_blob(&manifest_cache_key).await {
Ok(Some(cached_data)) => {
match serde_json::from_slice::<(OciImageManifest, String)>(&cached_data) {
Ok((manifest, digest)) => {
info!(key = %manifest_cache_key, resolved_digest = %digest, "Manifest cache hit and deserialized successfully.");
manifest_source_temp = ManifestSource::FromCache;
(manifest, digest)
}
Err(e) => {
warn!(key = %manifest_cache_key, error = %e, "Failed to deserialize cached manifest. Will pull from registry.");
manifest_source_temp = ManifestSource::FromRegistry;
let (pulled_manifest, pulled_digest) = oci_client
.pull_image_manifest(&base_ref, &pull_auth)
.await
.map_err(pull_err_mapper)?;
match serde_json::to_vec(&(
pulled_manifest.clone(),
pulled_digest.clone(),
)) {
Ok(data_to_cache) => {
if let Err(cache_err) = cache
.put_blob(&manifest_cache_key, &data_to_cache)
.await
{
warn!(key = %manifest_cache_key, error = %cache_err, "Failed to cache manifest after pull.");
}
}
Err(ser_err) => {
warn!(key = %manifest_cache_key, error = %ser_err, "Failed to serialize manifest for caching after pull.");
}
}
(pulled_manifest, pulled_digest)
}
}
}
Ok(None) => {
info!(key = %manifest_cache_key, "Manifest cache miss (no entry found). Will pull from registry.");
manifest_source_temp = ManifestSource::FromRegistry;
let (pulled_manifest, pulled_digest) = oci_client
.pull_image_manifest(&base_ref, &pull_auth)
.await
.map_err(pull_err_mapper)?;
match serde_json::to_vec(&(pulled_manifest.clone(), pulled_digest.clone()))
{
Ok(data_to_cache) => {
if let Err(cache_err) =
cache.put_blob(&manifest_cache_key, &data_to_cache).await
{
warn!(key = %manifest_cache_key, error = %cache_err, "Failed to cache manifest after pull.");
}
}
Err(ser_err) => {
warn!(key = %manifest_cache_key, error = %ser_err, "Failed to serialize manifest for caching after pull.");
}
}
(pulled_manifest, pulled_digest)
}
Err(e) => {
warn!(key = %manifest_cache_key, error = %e, "Error reading manifest from cache. Will pull from registry.");
manifest_source_temp = ManifestSource::FromRegistry;
let (pulled_manifest, pulled_digest) = oci_client
.pull_image_manifest(&base_ref, &pull_auth)
.await
.map_err(pull_err_mapper)?;
match serde_json::to_vec(&(pulled_manifest.clone(), pulled_digest.clone()))
{
Ok(data_to_cache) => {
if let Err(cache_err) =
cache.put_blob(&manifest_cache_key, &data_to_cache).await
{
warn!(key = %manifest_cache_key, error = %cache_err, "Failed to cache manifest after pull.");
}
}
Err(ser_err) => {
warn!(key = %manifest_cache_key, error = %ser_err, "Failed to serialize manifest for caching after pull.");
}
}
(pulled_manifest, pulled_digest)
}
}
} else {
info!(key = %manifest_cache_key, "PullPolicy::Always. Pulling manifest from registry.");
manifest_source_temp = ManifestSource::FromRegistry;
let (pulled_manifest, pulled_digest) = oci_client
.pull_image_manifest(&base_ref, &pull_auth)
.await
.map_err(pull_err_mapper)?;
match serde_json::to_vec(&(pulled_manifest.clone(), pulled_digest.clone())) {
Ok(data_to_cache) => {
if let Err(cache_err) =
cache.put_blob(&manifest_cache_key, &data_to_cache).await
{
warn!(key = %manifest_cache_key, error = %cache_err, "Failed to cache manifest after pull.");
} else {
debug!(key = %manifest_cache_key, "Successfully cached manifest after pull.");
}
}
Err(ser_err) => {
warn!(key = %manifest_cache_key, error = %ser_err, "Failed to serialize manifest for caching after pull.");
}
}
(pulled_manifest, pulled_digest)
};
info!(manifest_digest = %resolved_manifest_digest_str_temp, "Successfully obtained base ImageManifest (source: {:?}).", manifest_source_temp);
let config_descriptor = &base_image_manifest_resolved.config;
info!(config_digest = %config_descriptor.digest, "Fetching base image config blob.");
let config_data = match cache.get_blob(&config_descriptor.digest).await? {
Some(data) => {
info!(config_digest = %config_descriptor.digest, "Base image config blob found in cache.");
data
}
None => {
info!(config_digest = %config_descriptor.digest, "Base image config blob not in cache, pulling.");
let mut pulled_data = Vec::new();
oci_client
.pull_blob(&base_ref, config_descriptor, &mut pulled_data)
.await
.map_err(|e| Error::ImagePull {
image_ref: base_image_ref_str.to_string(),
message: format!(
"Failed to pull config blob {}",
config_descriptor.digest
),
source: Some(Box::new(e)),
})?;
cache
.put_blob(&config_descriptor.digest, &pulled_data)
.await?;
pulled_data
}
};
let oci_client_config_for_imagedata = OciClientConfig {
data: config_data,
media_type: config_descriptor.media_type.clone(),
annotations: config_descriptor.annotations.clone(),
};
info!(
num_base_layers = base_image_manifest_resolved.layers.len(),
"Fetching base image layer blobs."
);
let mut oci_client_layers = Vec::new();
for (idx, layer_descriptor) in base_image_manifest_resolved.layers.iter().enumerate() {
let layer_data = match cache.get_blob(&layer_descriptor.digest).await? {
Some(data) => {
info!(layer_idx = idx, layer_digest = %layer_descriptor.digest, "Base layer blob found in cache.");
data
}
None => {
info!(layer_idx = idx, layer_digest = %layer_descriptor.digest, "Base layer blob not in cache, pulling.");
let mut pulled_data = Vec::new();
oci_client
.pull_blob(&base_ref, layer_descriptor, &mut pulled_data)
.await
.map_err(|e| Error::ImagePull {
image_ref: base_image_ref_str.to_string(),
message: format!(
"Failed to pull layer blob {}",
layer_descriptor.digest
),
source: Some(Box::new(e)),
})?;
cache
.put_blob(&layer_descriptor.digest, &pulled_data)
.await?;
pulled_data
}
};
oci_client_layers.push(OciImageLayer {
data: layer_data,
media_type: layer_descriptor.media_type.clone(),
annotations: layer_descriptor.annotations.clone(),
});
}
manifest_source = manifest_source_temp;
resolved_manifest_digest_str = resolved_manifest_digest_str_temp;
base_image_data = Some(OciImageData {
layers: oci_client_layers,
digest: Some(resolved_manifest_digest_str.clone()),
config: oci_client_config_for_imagedata,
manifest: Some(base_image_manifest_resolved.clone()),
});
default_image_name = format!("{}:latest", base_ref.repository());
} else {
info!("Building from scratch (no base image)");
base_image_data = None;
manifest_source = ManifestSource::NotApplicable;
resolved_manifest_digest_str = String::new();
default_image_name = "scratch:latest".to_string();
}
let build_artifacts_dir = tempfile::tempdir().map_err(|e| Error::Io {
message: "Failed to create temporary directory for image build artifacts".to_string(),
source: e,
})?;
let current_config: ImageConfiguration = if let Some(ref base_data) = base_image_data {
let mut config: ImageConfiguration = serde_json::from_slice(&base_data.config.data)
.map_err(|e| Error::ImageConfig {
message: "Failed to parse base image configuration".to_string(),
source: Some(Box::new(e)),
})?;
let mut all_diff_ids: Vec<String> = config.rootfs().diff_ids().clone();
for new_layer in &self.layers {
all_diff_ids.push(new_layer.diff_id().to_string());
}
*config.rootfs_mut().diff_ids_mut() = all_diff_ids;
let mut proc_config = config.config().clone().unwrap_or_default();
if let Some(entrypoint) = self.entrypoint {
proc_config.set_entrypoint(Some(entrypoint));
if self.cmd.is_none() {
proc_config.set_cmd(None);
}
}
if let Some(cmd) = self.cmd {
proc_config.set_cmd(Some(cmd));
}
if let Some(working_dir) = self.working_dir {
proc_config.set_working_dir(Some(working_dir));
}
config.set_os(target_os_for_build.as_str().into());
config.set_architecture(target_arch_for_build.to_string().as_str().into());
config.set_config(Some(proc_config));
config
} else {
use oci_spec::image::{ConfigBuilder, ImageConfigurationBuilder, RootFsBuilder};
let diff_ids: Vec<String> = self
.layers
.iter()
.map(|layer| layer.diff_id().to_string())
.collect();
let rootfs = RootFsBuilder::default()
.typ("layers")
.diff_ids(diff_ids)
.build()
.map_err(|e| Error::ImageConfig {
message: format!("Failed to build rootfs: {}", e),
source: Some(Box::new(e)),
})?;
let mut config_builder = ConfigBuilder::default();
if let Some(entrypoint) = self.entrypoint {
config_builder = config_builder.entrypoint(entrypoint);
}
if let Some(cmd) = self.cmd {
config_builder = config_builder.cmd(cmd);
}
if let Some(working_dir) = self.working_dir {
config_builder = config_builder.working_dir(working_dir);
}
let proc_config = config_builder.build().map_err(|e| Error::ImageConfig {
message: format!("Failed to build config: {}", e),
source: Some(Box::new(e)),
})?;
ImageConfigurationBuilder::default()
.os(target_os_for_build.as_str())
.architecture(target_arch_for_build.to_string().as_str())
.rootfs(rootfs)
.config(proc_config)
.build()
.map_err(|e| Error::ImageConfig {
message: format!("Failed to build image configuration: {}", e),
source: Some(Box::new(e)),
})?
};
let config_json_bytes =
serde_json::to_vec(¤t_config).map_err(|e| Error::ImageConfig {
message: "Failed to serialize new image configuration".to_string(),
source: Some(Box::new(e)),
})?;
let config_digest_sha256 = {
let mut hasher = Sha256::new();
hasher.update(&config_json_bytes);
format!("sha256:{:x}", hasher.finalize())
};
let mut oci_tar_builder = oci_tar_builder::Builder::default();
if let Some(ref base_data) = base_image_data {
for (idx, base_layer_oci) in base_data.layers.iter().enumerate() {
let temp_layer_path = build_artifacts_dir
.path()
.join(format!("base_layer_{}.blob", idx));
std_fs::write(&temp_layer_path, &base_layer_oci.data).map_err(|e| Error::Io {
message: format!("Failed to write base layer {} to temp file", idx),
source: e,
})?;
oci_tar_builder
.add_layer_with_media_type(&temp_layer_path, base_layer_oci.media_type.clone());
}
}
for new_layer in &self.layers {
oci_tar_builder.add_layer_with_media_type(
&new_layer.path().to_path_buf(),
IMAGE_LAYER_ZSTD_MEDIA_TYPE.to_string(),
);
}
let image_name_and_tag_for_config = self
.output_image_name_and_tag
.clone()
.unwrap_or(default_image_name);
oci_tar_builder.add_config(current_config.clone(), image_name_and_tag_for_config);
let (oci_archive_final_path, temp_dir_manager_for_image_struct) = if let Some(output_p) =
self.output_path
{
if let Some(parent_dir) = output_p.parent() {
if !parent_dir.exists() {
std_fs::create_dir_all(parent_dir).map_err(|e| Error::Io {
message: format!(
"Failed to create parent directory for output OCI archive: {}",
parent_dir.display()
),
source: e,
})?;
}
}
(output_p, None)
} else {
let final_oci_temp_dir = tempfile::tempdir().map_err(|e| Error::Io {
message: "Failed to create temporary directory for final OCI archive".to_string(),
source: e,
})?;
(
final_oci_temp_dir.path().join("image.oci.tar"),
Some(final_oci_temp_dir),
)
};
let oci_archive_file =
std_fs::File::create(&oci_archive_final_path).map_err(|e| Error::OciArchive {
message: format!(
"Failed to create OCI archive file at {}",
oci_archive_final_path.display()
),
source: Some(Box::new(e)),
})?;
oci_tar_builder
.build(oci_archive_file)
.map_err(|e| Error::OciArchive {
message: format!("OCI tar builder failed: {}", e),
source: Some(e.into()),
})?;
let diagnostics = BuildDiagnostics {
manifest_source,
resolved_manifest_digest: resolved_manifest_digest_str.clone(),
};
Ok((
Image {
oci_archive_path: oci_archive_final_path,
config_digest: config_digest_sha256,
_temp_dir_manager: temp_dir_manager_for_image_struct,
},
diagnostics,
))
}
}
fn determine_registry_auth(reference: &Reference) -> RegistryAuth {
let registry_host = reference.resolve_registry();
match (env::var("DOCKER_USERNAME"), env::var("DOCKER_PASSWORD")) {
(Ok(username), Ok(password)) if !username.is_empty() && !password.is_empty() => {
info!(
"Using Docker credentials from DOCKER_USERNAME/PASSWORD env vars for {}",
registry_host
);
RegistryAuth::Basic(username, password)
}
_ => {
info!(
"DOCKER_USERNAME and/or DOCKER_PASSWORD not set or empty. Falling back to anonymous auth for {}.",
registry_host
);
RegistryAuth::Anonymous
}
}
}
fn determine_use_monolithic_push(policy: &MonolithicPushPolicy, reference: &Reference) -> bool {
match policy {
MonolithicPushPolicy::Always => {
debug!("MonolithicPushPolicy::Always - using monolithic push");
true
}
MonolithicPushPolicy::Never => {
debug!("MonolithicPushPolicy::Never - using chunked upload");
false
}
MonolithicPushPolicy::Auto => {
let registry_host = reference.resolve_registry();
let use_monolithic = is_registry_requiring_monolithic_push(registry_host);
if use_monolithic {
info!(
"Registry {} requires monolithic push - enabling monolithic push mode",
registry_host
);
} else {
debug!(
"Registry {} supports chunked upload - using chunked upload mode",
registry_host
);
}
use_monolithic
}
}
}
fn extract_layer_with_whiteouts<R: std::io::Read>(
mut archive: tar::Archive<R>,
target_dir: &Path,
) -> std::io::Result<()> {
for entry_result in archive.entries()? {
let mut entry = entry_result?;
let entry_path = entry.path()?.into_owned();
let file_name = match entry_path.file_name().and_then(|n| n.to_str()) {
Some(name) => name.to_string(),
None => {
entry.unpack_in(target_dir)?;
continue;
}
};
if file_name == ".wh..wh..opq" {
let parent = target_dir.join(entry_path.parent().unwrap_or_else(|| Path::new("")));
if parent.is_dir() {
for child in std_fs::read_dir(&parent)? {
let child = child?;
let child_path = child.path();
if child_path.is_dir() {
std_fs::remove_dir_all(&child_path)?;
} else {
std_fs::remove_file(&child_path)?;
}
}
}
} else if let Some(target_name) = file_name.strip_prefix(".wh.") {
let target_path = target_dir.join(
entry_path
.parent()
.unwrap_or_else(|| Path::new(""))
.join(target_name),
);
let remove_result = if target_path.is_dir() {
std_fs::remove_dir_all(&target_path)
} else {
std_fs::remove_file(&target_path)
};
if let Err(e) = remove_result {
if e.kind() != std::io::ErrorKind::NotFound {
return Err(e);
}
}
} else {
if !entry.unpack_in(target_dir)? {
warn!(
path = %entry_path.display(),
"Skipping tar entry: path escapes target directory"
);
}
}
}
Ok(())
}
fn is_registry_requiring_monolithic_push(registry_host: &str) -> bool {
registry_host.ends_with("-docker.pkg.dev") || registry_host == "gcr.io"
|| registry_host.ends_with(".gcr.io") }