use crate::core::types::{OciIndex, OciManifest, PushKind, PushResult};
use std::collections::HashSet;
use std::path::Path;
use std::time::Instant;
#[derive(Debug, Clone)]
pub struct RegistryPushConfig {
pub registry: String,
pub name: String,
pub tag: String,
pub check_existing: bool,
}
#[derive(Debug, Clone)]
pub struct BlobDescriptor {
pub digest: String,
pub size: u64,
pub path: std::path::PathBuf,
pub kind: PushKind,
}
pub fn check_blob_exists(registry: &str, name: &str, digest: &str) -> Result<bool, String> {
let url = format!("https://{registry}/v2/{name}/blobs/{digest}");
let output = std::process::Command::new("curl")
.args([
"-s",
"-o",
"/dev/null",
"-w",
"%{http_code}",
"--head",
&url,
])
.output()
.map_err(|e| format!("curl HEAD: {e}"))?;
let status = String::from_utf8_lossy(&output.stdout);
Ok(status.trim() == "200")
}
pub fn head_check_command(registry: &str, name: &str, digest: &str) -> String {
format!(
"curl -s -o /dev/null -w '%{{http_code}}' --head 'https://{registry}/v2/{name}/blobs/{digest}'"
)
}
pub fn upload_initiate_command(registry: &str, name: &str) -> String {
format!("curl -s -X POST -D - 'https://{registry}/v2/{name}/blobs/uploads/'")
}
pub fn upload_complete_command(upload_url: &str, digest: &str, blob_path: &str) -> String {
format!(
"curl -s --fail-with-body -X PUT -H 'Content-Type: application/octet-stream' \
--data-binary '@{blob_path}' '{upload_url}?digest={digest}'"
)
}
pub fn manifest_put_command(registry: &str, name: &str, tag: &str, manifest_path: &str) -> String {
format!(
"curl -s --fail-with-body -X PUT -H 'Content-Type: application/vnd.oci.image.manifest.v1+json' \
--data-binary '@{manifest_path}' 'https://{registry}/v2/{name}/manifests/{tag}'"
)
}
pub(crate) const CHUNKED_UPLOAD_THRESHOLD: u64 = 64 * 1024 * 1024;
pub(crate) const CHUNK_SIZE: u64 = 16 * 1024 * 1024;
pub fn push_blob(config: &RegistryPushConfig, blob: &BlobDescriptor) -> Result<PushResult, String> {
let start = Instant::now();
if config.check_existing {
let exists = check_blob_exists(&config.registry, &config.name, &blob.digest)?;
if exists {
return Ok(PushResult {
kind: blob.kind,
digest: blob.digest.clone(),
size: blob.size,
existed: true,
duration_secs: 0.0,
});
}
}
let upload_url = initiate_upload(&config.registry, &config.name)?;
if blob.size >= CHUNKED_UPLOAD_THRESHOLD {
push_blob_chunked(&upload_url, blob)?;
} else {
push_blob_monolithic(&upload_url, blob)?;
}
Ok(PushResult {
kind: blob.kind,
digest: blob.digest.clone(),
size: blob.size,
existed: false,
duration_secs: start.elapsed().as_secs_f64(),
})
}
fn initiate_upload(registry: &str, name: &str) -> Result<String, String> {
let output = std::process::Command::new("curl")
.args([
"-s",
"-X",
"POST",
"-D",
"-",
&format!("https://{registry}/v2/{name}/blobs/uploads/"),
])
.output()
.map_err(|e| format!("blob upload initiate: {e}"))?;
let headers = String::from_utf8_lossy(&output.stdout);
parse_location_header(&headers)
.ok_or_else(|| "no Location header in upload response".to_string())
}
pub(crate) fn monolithic_put_args(upload_url: &str, digest: &str, blob_path: &str) -> Vec<String> {
vec![
"-s".into(),
"--fail-with-body".into(),
"-X".into(),
"PUT".into(),
"-H".into(),
"Content-Type: application/octet-stream".into(),
"--data-binary".into(),
format!("@{blob_path}"),
format!("{upload_url}?digest={digest}"),
]
}
fn push_blob_monolithic(upload_url: &str, blob: &BlobDescriptor) -> Result<(), String> {
let blob_path = blob.path.display().to_string();
let args = monolithic_put_args(upload_url, &blob.digest, &blob_path);
let output = std::process::Command::new("curl")
.args(&args)
.output()
.map_err(|e| format!("blob upload complete: {e}"))?;
if !output.status.success() {
return Err(format!(
"blob upload failed (HTTP error): {}",
curl_error_detail(&output)
));
}
Ok(())
}
fn curl_error_detail(output: &std::process::Output) -> String {
let stdout = String::from_utf8_lossy(&output.stdout);
let stderr = String::from_utf8_lossy(&output.stderr);
let body = stdout.trim();
let err = stderr.trim();
match (body.is_empty(), err.is_empty()) {
(false, false) => format!("{err}: {body}"),
(false, true) => body.to_string(),
(true, false) => err.to_string(),
(true, true) => "no response body".to_string(),
}
}
pub(crate) fn push_blob_chunked(upload_url: &str, blob: &BlobDescriptor) -> Result<(), String> {
let blob_path = blob.path.display().to_string();
let total_size = blob.size;
let mut offset: u64 = 0;
let mut current_url = upload_url.to_string();
while offset < total_size {
let end = std::cmp::min(offset + CHUNK_SIZE, total_size) - 1;
let range = format!("{offset}-{end}");
let output = std::process::Command::new("curl")
.args([
"-s",
"--fail-with-body", "-X",
"PATCH",
"-D",
"-",
"-H",
"Content-Type: application/octet-stream",
"-H",
&format!("Content-Range: {range}"),
"-H",
&format!("Content-Length: {}", end - offset + 1),
"-r",
&range,
"--data-binary",
&format!("@{blob_path}"),
¤t_url,
])
.output()
.map_err(|e| format!("chunked upload PATCH: {e}"))?;
if !output.status.success() {
return Err(format!(
"chunked upload failed at range {range} (HTTP error): {}",
curl_error_detail(&output)
));
}
let headers = String::from_utf8_lossy(&output.stdout);
if let Some(loc) = parse_location_header(&headers) {
current_url = loc;
}
offset = end + 1;
}
let output = std::process::Command::new("curl")
.args([
"-s",
"--fail-with-body", "-X",
"PUT",
"-H",
"Content-Type: application/octet-stream",
&format!("{current_url}?digest={}", blob.digest),
])
.output()
.map_err(|e| format!("chunked upload finalize: {e}"))?;
if !output.status.success() {
return Err(format!(
"chunked upload finalize failed (HTTP error): {}",
curl_error_detail(&output)
));
}
Ok(())
}
pub(crate) fn manifest_put_args(manifest_json: &str, url: &str) -> Vec<String> {
vec![
"-s".into(),
"--fail-with-body".into(),
"-X".into(),
"PUT".into(),
"-H".into(),
"Content-Type: application/vnd.oci.image.manifest.v1+json".into(),
"-d".into(),
manifest_json.into(),
url.into(),
]
}
pub fn push_manifest(
config: &RegistryPushConfig,
manifest_json: &str,
digest: &str,
) -> Result<PushResult, String> {
let start = Instant::now();
let url = format!(
"https://{}/v2/{}/manifests/{}",
config.registry, config.name, config.tag
);
let args = manifest_put_args(manifest_json, &url);
let output = std::process::Command::new("curl")
.args(&args)
.output()
.map_err(|e| format!("manifest push: {e}"))?;
if !output.status.success() {
return Err(format!(
"manifest push failed (HTTP error): {}",
curl_error_detail(&output)
));
}
Ok(PushResult {
kind: PushKind::Manifest,
digest: digest.to_string(),
size: manifest_json.len() as u64,
existed: false,
duration_secs: start.elapsed().as_secs_f64(),
})
}
pub fn push_image(oci_dir: &Path, config: &RegistryPushConfig) -> Result<Vec<PushResult>, String> {
let blobs_dir = oci_dir.join("blobs").join("sha256");
if !blobs_dir.is_dir() {
return Err(format!(
"OCI blobs directory not found: {}",
blobs_dir.display()
));
}
let index_path = oci_dir.join("index.json");
if !index_path.exists() {
return Err(format!(
"OCI index.json not found: {}",
index_path.display()
));
}
let blobs = discover_blobs(oci_dir)?;
let mut results = Vec::new();
for kind in [PushKind::Layer, PushKind::Config, PushKind::Manifest] {
for blob in blobs.iter().filter(|b| b.kind == kind) {
let result = push_blob(config, blob)?;
results.push(result);
}
}
Ok(results)
}
struct DigestClassification {
manifests: HashSet<String>,
configs: HashSet<String>,
}
fn classify_digests_from_index(oci_dir: &Path) -> DigestClassification {
let mut result = DigestClassification {
manifests: HashSet::new(),
configs: HashSet::new(),
};
let index_path = oci_dir.join("index.json");
let index_json = match std::fs::read_to_string(&index_path) {
Ok(s) => s,
Err(_) => return result,
};
let index: OciIndex = match serde_json::from_str(&index_json) {
Ok(i) => i,
Err(_) => return result,
};
let blobs_dir = oci_dir.join("blobs").join("sha256");
for m in &index.manifests {
result.manifests.insert(m.digest.clone());
let hash = m.digest.strip_prefix("sha256:").unwrap_or(&m.digest);
let mf_path = blobs_dir.join(hash);
if let Ok(mf_json) = std::fs::read_to_string(&mf_path) {
if let Ok(manifest) = serde_json::from_str::<OciManifest>(&mf_json) {
result.configs.insert(manifest.config.digest.clone());
}
}
}
result
}
pub(crate) fn discover_blobs(oci_dir: &Path) -> Result<Vec<BlobDescriptor>, String> {
let blobs_dir = oci_dir.join("blobs").join("sha256");
if !blobs_dir.is_dir() {
return Ok(Vec::new());
}
let classification = classify_digests_from_index(oci_dir);
let mut blobs = Vec::new();
let entries = std::fs::read_dir(&blobs_dir).map_err(|e| format!("read blobs dir: {e}"))?;
for entry in entries {
let entry = entry.map_err(|e| format!("read blob entry: {e}"))?;
let path = entry.path();
if !path.is_file() {
continue;
}
let name = entry.file_name().to_string_lossy().to_string();
let digest = format!("sha256:{name}");
let size = entry.metadata().map(|m| m.len()).unwrap_or(0);
let kind = if classification.manifests.contains(&digest) {
PushKind::Manifest
} else if classification.configs.contains(&digest) {
PushKind::Config
} else {
PushKind::Layer
};
blobs.push(BlobDescriptor {
digest,
size,
path,
kind,
});
}
Ok(blobs)
}
pub(crate) fn parse_location_header(headers: &str) -> Option<String> {
for line in headers.lines() {
let lower = line.to_lowercase();
if lower.starts_with("location:") {
return Some(line[9..].trim().to_string());
}
}
None
}
pub use super::registry_push_fmt::{format_push_summary, validate_push_config};