use axum::body::{Body, Bytes};
use axum::http::{HeaderMap, HeaderValue, StatusCode, header};
use axum::response::{IntoResponse, Response};
use ferro_blob_store::Digest;
use serde_json::Value;
use crate::error::{OciError, OciErrorCode};
use crate::media_types::{ManifestKind, classify_manifest_media_type};
use crate::reference::{Reference, validate_name};
use crate::registry::ReferrerDescriptor;
use crate::router::AppState;
fn parse_reference(s: &str) -> Result<Reference, OciError> {
s.parse::<Reference>()
}
fn map_mutation_error(err: ferro_blob_store::BlobStoreError) -> OciError {
use axum::http::StatusCode;
use ferro_blob_store::BlobStoreError as B;
if let B::Io(io) = &err {
return OciError::new(
OciErrorCode::Unsupported,
format!("registry metadata could not be persisted durably: {io}"),
)
.with_status(StatusCode::INTERNAL_SERVER_ERROR);
}
OciError::from(err)
}
fn manifest_response_headers(digest: &Digest, media_type: &str, size: usize) -> HeaderMap {
let mut headers = HeaderMap::new();
let digest_str = digest.to_string();
if let Ok(v) = HeaderValue::from_str(&digest_str) {
headers.insert("Docker-Content-Digest", v);
if let Ok(etag) = HeaderValue::from_str(&format!("\"{digest_str}\"")) {
headers.insert(header::ETAG, etag);
}
}
if let Ok(v) = HeaderValue::from_str(media_type) {
headers.insert(header::CONTENT_TYPE, v);
}
headers.insert(header::CONTENT_LENGTH, HeaderValue::from(size as u64));
headers
}
pub async fn get_manifest(
state: &AppState,
name: &str,
reference_str: &str,
request_headers: &HeaderMap,
) -> Response {
if let Err(e) = validate_name(name) {
return e.into_response();
}
let Ok(reference) = parse_reference(reference_str) else {
return manifest_not_found(name, reference_str);
};
match state.registry.get_manifest(name, &reference).await {
Ok(Some((digest, media_type, body))) => {
range_or_full_response(request_headers, &digest, &media_type, &body)
}
Ok(None) => manifest_not_found(name, reference_str),
Err(e) => OciError::from(e).into_response(),
}
}
fn range_or_full_response(
request_headers: &HeaderMap,
digest: &Digest,
media_type: &str,
body: &Bytes,
) -> Response {
let total = body.len();
let raw_range = request_headers
.get(header::RANGE)
.and_then(|v| v.to_str().ok());
let parse_outcome = raw_range.map(parse_byte_range);
match parse_outcome {
None => {
let mut headers = manifest_response_headers(digest, media_type, total);
headers.insert(header::ACCEPT_RANGES, HeaderValue::from_static("bytes"));
(StatusCode::OK, headers, Body::from(body.clone())).into_response()
}
Some(ByteRangeOutcome::Ignore) => {
let mut headers = manifest_response_headers(digest, media_type, total);
headers.insert(header::ACCEPT_RANGES, HeaderValue::from_static("bytes"));
(StatusCode::OK, headers, Body::from(body.clone())).into_response()
}
Some(ByteRangeOutcome::Unsatisfiable) => unsatisfiable_response(total),
Some(ByteRangeOutcome::Range { start, end }) => {
let last = total.saturating_sub(1);
if total == 0 || start > last {
return unsatisfiable_response(total);
}
let clamped_end = end.min(last);
let slice_len = clamped_end - start + 1;
let slice = body.slice(start..=clamped_end);
let mut headers = HeaderMap::new();
let digest_str = digest.to_string();
if let Ok(v) = HeaderValue::from_str(&digest_str) {
headers.insert("Docker-Content-Digest", v);
if let Ok(etag) = HeaderValue::from_str(&format!("\"{digest_str}\"")) {
headers.insert(header::ETAG, etag);
}
}
if let Ok(v) = HeaderValue::from_str(media_type) {
headers.insert(header::CONTENT_TYPE, v);
}
headers.insert(header::CONTENT_LENGTH, HeaderValue::from(slice_len as u64));
headers.insert(header::ACCEPT_RANGES, HeaderValue::from_static("bytes"));
if let Ok(v) = HeaderValue::from_str(&format!("bytes {start}-{clamped_end}/{total}")) {
headers.insert(header::CONTENT_RANGE, v);
}
(StatusCode::PARTIAL_CONTENT, headers, Body::from(slice)).into_response()
}
}
}
fn unsatisfiable_response(total: usize) -> Response {
let mut headers = HeaderMap::new();
if let Ok(v) = HeaderValue::from_str(&format!("bytes */{total}")) {
headers.insert(header::CONTENT_RANGE, v);
}
headers.insert(header::ACCEPT_RANGES, HeaderValue::from_static("bytes"));
(StatusCode::RANGE_NOT_SATISFIABLE, headers).into_response()
}
#[derive(Debug, PartialEq, Eq)]
enum ByteRangeOutcome {
Range { start: usize, end: usize },
Unsatisfiable,
Ignore,
}
fn parse_byte_range(raw: &str) -> ByteRangeOutcome {
let Some(spec) = raw.strip_prefix("bytes=") else {
return ByteRangeOutcome::Ignore;
};
if spec.contains(',') {
return ByteRangeOutcome::Ignore;
}
let Some((lhs, rhs)) = spec.split_once('-') else {
return ByteRangeOutcome::Ignore;
};
if lhs.is_empty() {
return ByteRangeOutcome::Ignore;
}
let Ok(start) = lhs.parse::<usize>() else {
return ByteRangeOutcome::Ignore;
};
let end = if rhs.is_empty() {
usize::MAX
} else {
match rhs.parse::<usize>() {
Ok(v) => v,
Err(_) => return ByteRangeOutcome::Ignore,
}
};
if start > end {
return ByteRangeOutcome::Unsatisfiable;
}
ByteRangeOutcome::Range { start, end }
}
pub async fn head_manifest(state: &AppState, name: &str, reference_str: &str) -> Response {
if let Err(e) = validate_name(name) {
return e.into_response();
}
let Ok(reference) = parse_reference(reference_str) else {
return manifest_not_found(name, reference_str);
};
match state.registry.get_manifest(name, &reference).await {
Ok(Some((digest, media_type, body))) => {
let headers = manifest_response_headers(&digest, &media_type, body.len());
(StatusCode::OK, headers).into_response()
}
Ok(None) => manifest_not_found(name, reference_str),
Err(e) => OciError::from(e).into_response(),
}
}
fn manifest_not_found(name: &str, reference_str: &str) -> Response {
OciError::new(
OciErrorCode::ManifestUnknown,
format!("manifest {reference_str} not found in {name}"),
)
.into_response()
}
pub async fn put_manifest(
state: &AppState,
name: &str,
reference_str: &str,
headers: &HeaderMap,
body: Bytes,
) -> Response {
if let Err(e) = validate_name(name) {
return e.into_response();
}
let reference = match parse_reference(reference_str) {
Ok(r) => r,
Err(e) => return e.into_response(),
};
let content_type = match headers
.get(header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
{
Some(s) => s.to_owned(),
None => {
return OciError::new(OciErrorCode::ManifestInvalid, "missing Content-Type")
.into_response();
}
};
let Some(kind) = classify_manifest_media_type(&content_type) else {
return OciError::new(
OciErrorCode::ManifestInvalid,
format!("unsupported manifest media type `{content_type}`"),
)
.into_response();
};
let parsed: Value = match serde_json::from_slice(&body) {
Ok(v) => v,
Err(e) => {
return OciError::new(
OciErrorCode::ManifestInvalid,
format!("manifest is not valid JSON: {e}"),
)
.into_response();
}
};
if let Err(e) = verify_referenced_blobs(state, name, &parsed, kind).await {
return e.into_response();
}
let digest = Digest::sha256_of(&body);
if let Reference::Digest(declared) = &reference {
if declared.algo() != digest.algo() {
return OciError::new(
OciErrorCode::DigestInvalid,
format!(
"unsupported manifest digest algorithm `{}`: only sha256 is supported",
declared.algo()
),
)
.into_response();
}
if declared.hex() != digest.hex() {
return OciError::new(
OciErrorCode::DigestInvalid,
format!("manifest digest mismatch: reference {declared}, computed {digest}"),
)
.into_response();
}
}
let body_len = body.len() as u64;
let referrer = build_referrer(&parsed, &digest, &content_type, body_len);
if let Err(e) = state
.registry
.put_manifest_with_referrer(
name,
&reference,
&digest,
&content_type,
body.clone(),
referrer,
)
.await
{
return map_mutation_error(e).into_response();
}
let mut out = HeaderMap::new();
let location = format!("/v2/{name}/manifests/{digest}");
if let Ok(v) = HeaderValue::from_str(&location) {
out.insert(header::LOCATION, v);
}
if let Ok(v) = HeaderValue::from_str(&digest.to_string()) {
out.insert("Docker-Content-Digest", v);
}
if let Some(subj) = parsed
.get("subject")
.and_then(|s| s.get("digest"))
.and_then(Value::as_str)
&& let Ok(v) = HeaderValue::from_str(subj)
{
out.insert("OCI-Subject", v);
}
out.insert(header::CONTENT_LENGTH, HeaderValue::from(0u64));
(StatusCode::CREATED, out).into_response()
}
fn build_referrer(
parsed: &Value,
digest: &Digest,
content_type: &str,
body_len: u64,
) -> Option<(Digest, ReferrerDescriptor)> {
let subj = parsed
.get("subject")
.and_then(|s| s.get("digest"))
.and_then(Value::as_str)
.and_then(|s| s.parse::<Digest>().ok())?;
let artifact_type = parsed
.get("artifactType")
.and_then(Value::as_str)
.filter(|s| !s.is_empty())
.or_else(|| {
parsed
.get("config")
.and_then(|c| c.get("mediaType"))
.and_then(Value::as_str)
})
.map(str::to_owned);
let annotations = parsed
.get("annotations")
.and_then(Value::as_object)
.map(|m| {
m.iter()
.filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_owned())))
.collect::<std::collections::BTreeMap<_, _>>()
});
let descriptor = ReferrerDescriptor {
media_type: content_type.to_owned(),
digest: digest.clone(),
size: body_len,
artifact_type,
annotations,
};
Some((subj, descriptor))
}
pub async fn delete_manifest(state: &AppState, name: &str, reference_str: &str) -> Response {
if let Err(e) = validate_name(name) {
return e.into_response();
}
let reference = match parse_reference(reference_str) {
Ok(r) => r,
Err(e) => return e.into_response(),
};
if reference.is_tag() {
return OciError::new(
OciErrorCode::Unsupported,
"DELETE manifest by tag is not supported; use digest",
)
.with_status(StatusCode::METHOD_NOT_ALLOWED)
.into_response();
}
match state.registry.delete_manifest(name, &reference).await {
Ok(true) => (StatusCode::ACCEPTED, HeaderMap::new()).into_response(),
Ok(false) => OciError::new(
OciErrorCode::ManifestUnknown,
format!("manifest {reference_str} not found in {name}"),
)
.into_response(),
Err(e) => map_mutation_error(e).into_response(),
}
}
async fn verify_referenced_blobs(
state: &AppState,
name: &str,
parsed: &Value,
kind: ManifestKind,
) -> Result<(), OciError> {
match kind {
ManifestKind::ImageManifest | ManifestKind::Artifact => {
if let Some(config) = parsed.get("config").and_then(Value::as_object)
&& let Some(d) = config.get("digest").and_then(Value::as_str)
{
check_blob_present(state, d).await?;
}
if let Some(layers) = parsed.get("layers").and_then(Value::as_array) {
for layer in layers {
if let Some(d) = layer.get("digest").and_then(Value::as_str) {
check_blob_present(state, d).await?;
}
}
}
}
ManifestKind::ImageIndex => {
if let Some(manifests) = parsed.get("manifests").and_then(Value::as_array) {
for manifest in manifests {
if let Some(d) = manifest.get("digest").and_then(Value::as_str) {
let digest = d.parse::<Digest>().map_err(|e| {
OciError::new(
OciErrorCode::ManifestInvalid,
format!("invalid digest in manifests[]: {e}"),
)
})?;
let as_manifest = state
.registry
.get_manifest(name, &Reference::Digest(digest.clone()))
.await
.map_err(OciError::from)?
.is_some();
if as_manifest {
continue;
}
let as_blob = state
.blob_store
.contains(&digest)
.await
.map_err(OciError::from)?;
if !as_blob {
return Err(OciError::new(
OciErrorCode::ManifestBlobUnknown,
format!("referenced manifest digest {d} not present"),
));
}
}
}
}
}
}
Ok(())
}
const OCI_EMPTY_DESCRIPTOR_DIGEST: &str =
"sha256:44136fa355b3678a1146ad16f7e8649e94fb4fc21fe77e8310c060f61caaff8a";
async fn check_blob_present(state: &AppState, digest_str: &str) -> Result<(), OciError> {
if digest_str == OCI_EMPTY_DESCRIPTOR_DIGEST {
return Ok(());
}
let digest = digest_str.parse::<Digest>().map_err(|e| {
OciError::new(
OciErrorCode::ManifestInvalid,
format!("invalid digest `{digest_str}`: {e}"),
)
})?;
let present = state
.blob_store
.contains(&digest)
.await
.map_err(OciError::from)?;
if !present {
return Err(OciError::new(
OciErrorCode::ManifestBlobUnknown,
format!("referenced blob {digest_str} not present"),
));
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::{
ByteRangeOutcome, parse_byte_range, range_or_full_response,
};
use axum::http::{HeaderMap, StatusCode, header};
use bytes::Bytes;
use ferro_blob_store::Digest;
fn range_headers(value: &str) -> HeaderMap {
let mut h = HeaderMap::new();
h.insert(header::RANGE, value.parse().expect("range header"));
h
}
#[test]
fn parse_byte_range_equal_bounds_is_satisfiable() {
assert_eq!(
parse_byte_range("bytes=5-5"),
ByteRangeOutcome::Range { start: 5, end: 5 },
);
assert_eq!(
parse_byte_range("bytes=6-5"),
ByteRangeOutcome::Unsatisfiable,
);
}
#[test]
fn range_request_for_last_byte_is_partial_not_unsatisfiable() {
let body = Bytes::from_static(b"hello");
let digest = Digest::sha256_of(&body);
let resp = range_or_full_response(
&range_headers("bytes=4-4"),
&digest,
"application/octet-stream",
&body,
);
assert_eq!(
resp.status(),
StatusCode::PARTIAL_CONTENT,
"requesting the last byte is satisfiable (206)"
);
assert_eq!(
resp.headers()[header::CONTENT_LENGTH],
"1",
"single-byte range has Content-Length 1"
);
assert_eq!(
resp.headers()[header::CONTENT_RANGE],
"bytes 4-4/5",
"Content-Range names the exact byte"
);
let past = range_or_full_response(
&range_headers("bytes=5-9"),
&digest,
"application/octet-stream",
&body,
);
assert_eq!(
past.status(),
StatusCode::RANGE_NOT_SATISFIABLE,
"a start past the end is 416"
);
}
#[test]
fn multi_byte_range_content_length_is_exact() {
let body = Bytes::from_static(b"0123456789");
let digest = Digest::sha256_of(&body);
let resp = range_or_full_response(
&range_headers("bytes=2-7"),
&digest,
"application/octet-stream",
&body,
);
assert_eq!(resp.status(), StatusCode::PARTIAL_CONTENT);
assert_eq!(
resp.headers()[header::CONTENT_LENGTH],
"6",
"bytes=2-7 spans exactly 6 bytes"
);
}
}