#![allow(clippy::too_many_lines)]
use std::sync::Arc;
use axum::Router;
use axum::body::{Body, to_bytes};
use axum::http::{Method, Request, StatusCode, header};
use ferro_blob_store::{Digest, InMemoryBlobStore, SharedBlobStore};
use ferro_oci_server::{AppState, InMemoryRegistryMeta, probe_routes, router};
use serde_json::{Value, json};
use tower::ServiceExt;
const EMPTY_DIGEST: &str = "sha256:44136fa355b3678a1146ad16f7e8649e94fb4fc21fe77e8310c060f61caaff8a";
fn app() -> Router {
let blob_store: SharedBlobStore = Arc::new(InMemoryBlobStore::new());
let registry = Arc::new(InMemoryRegistryMeta::new());
let state = AppState::new(blob_store, registry);
router(state).merge(probe_routes())
}
async fn send(app: &Router, req: Request<Body>) -> (StatusCode, axum::http::HeaderMap, Vec<u8>) {
let resp = app.clone().oneshot(req).await.expect("response");
let status = resp.status();
let headers = resp.headers().clone();
let body = to_bytes(resp.into_body(), 1 << 20).await.expect("body");
(status, headers, body.to_vec())
}
fn get(uri: &str) -> Request<Body> {
Request::builder()
.method(Method::GET)
.uri(uri)
.body(Body::empty())
.expect("req")
}
fn method(m: Method, uri: &str, body: Body) -> Request<Body> {
Request::builder()
.method(m)
.uri(uri)
.body(body)
.expect("req")
}
fn assert_error_code(body: &[u8], expected: &str) {
let v: Value = serde_json::from_slice(body).expect("error json");
assert_eq!(v["errors"][0]["code"], expected, "body={v}");
}
#[tokio::test]
async fn blob_get_missing_returns_404_blob_unknown() {
let app = app();
let digest = Digest::sha256_of(b"absent").to_string();
let (status, _h, body) = send(&app, get(&format!("/v2/repo/blobs/{digest}"))).await;
assert_eq!(status, StatusCode::NOT_FOUND);
assert_error_code(&body, "BLOB_UNKNOWN");
}
#[tokio::test]
async fn blob_get_invalid_digest_returns_400() {
let app = app();
let (status, _h, body) = send(&app, get("/v2/repo/blobs/not-a-digest")).await;
assert_eq!(status, StatusCode::BAD_REQUEST);
assert_error_code(&body, "DIGEST_INVALID");
}
#[tokio::test]
async fn blob_get_empty_descriptor_served_synthetically() {
let app = app();
let (status, headers, body) = send(&app, get(&format!("/v2/repo/blobs/{EMPTY_DIGEST}"))).await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body, b"{}");
assert_eq!(headers[header::CONTENT_LENGTH], "2");
assert!(headers.contains_key("docker-content-digest"));
}
#[tokio::test]
async fn blob_head_empty_descriptor_ok_no_body() {
let app = app();
let (status, headers, body) = send(
&app,
method(Method::HEAD, &format!("/v2/repo/blobs/{EMPTY_DIGEST}"), Body::empty()),
)
.await;
assert_eq!(status, StatusCode::OK);
assert!(body.is_empty());
assert_eq!(headers[header::CONTENT_LENGTH], "2");
}
#[tokio::test]
async fn blob_head_missing_returns_404() {
let app = app();
let digest = Digest::sha256_of(b"absent-head").to_string();
let (status, _h, _b) = send(
&app,
method(Method::HEAD, &format!("/v2/repo/blobs/{digest}"), Body::empty()),
)
.await;
assert_eq!(status, StatusCode::NOT_FOUND);
}
#[tokio::test]
async fn blob_delete_existing_then_missing() {
let app = app();
let payload = b"deletable-bytes";
let digest = Digest::sha256_of(payload).to_string();
let (status, _h, _b) = send(
&app,
method(
Method::POST,
&format!("/v2/repo/blobs/uploads/?digest={digest}"),
Body::from(&payload[..]),
),
)
.await;
assert_eq!(status, StatusCode::CREATED);
let (status, _h, _b) = send(
&app,
method(Method::DELETE, &format!("/v2/repo/blobs/{digest}"), Body::empty()),
)
.await;
assert_eq!(status, StatusCode::ACCEPTED);
let (status, _h, body) = send(
&app,
method(Method::DELETE, &format!("/v2/repo/blobs/{digest}"), Body::empty()),
)
.await;
assert_eq!(status, StatusCode::NOT_FOUND);
assert_error_code(&body, "BLOB_UNKNOWN");
}
#[tokio::test]
async fn blob_get_invalid_name_returns_400() {
let app = app();
let (status, _h, body) = send(&app, get(&format!("/v2/Bad_NAME/blobs/{EMPTY_DIGEST}"))).await;
assert_eq!(status, StatusCode::BAD_REQUEST);
assert_error_code(&body, "NAME_INVALID");
}
#[tokio::test]
async fn monolithic_upload_digest_mismatch_returns_400() {
let app = app();
let wrong = Digest::sha256_of(b"something-else").to_string();
let (status, _h, body) = send(
&app,
method(
Method::POST,
&format!("/v2/repo/blobs/uploads/?digest={wrong}"),
Body::from(&b"actual-bytes"[..]),
),
)
.await;
assert_eq!(status, StatusCode::BAD_REQUEST);
assert_error_code(&body, "DIGEST_INVALID");
}
#[tokio::test]
async fn monolithic_upload_bad_digest_string_returns_400() {
let app = app();
let (status, _h, body) = send(
&app,
method(
Method::POST,
"/v2/repo/blobs/uploads/?digest=not-a-digest",
Body::from(&b"x"[..]),
),
)
.await;
assert_eq!(status, StatusCode::BAD_REQUEST);
assert_error_code(&body, "DIGEST_INVALID");
}
#[tokio::test]
async fn start_upload_then_get_status_then_cancel() {
let app = app();
let (status, headers, _b) =
send(&app, method(Method::POST, "/v2/repo/blobs/uploads/", Body::empty())).await;
assert_eq!(status, StatusCode::ACCEPTED);
let location = headers[header::LOCATION].to_str().unwrap().to_owned();
assert!(location.contains("/blobs/uploads/"));
let (status, headers, _b) = send(&app, get(&location)).await;
assert_eq!(status, StatusCode::NO_CONTENT);
assert!(headers.contains_key(header::RANGE));
let (status, _h, _b) =
send(&app, method(Method::DELETE, &location, Body::empty())).await;
assert_eq!(status, StatusCode::NO_CONTENT);
let (status, _h, body) =
send(&app, method(Method::DELETE, &location, Body::empty())).await;
assert_eq!(status, StatusCode::NOT_FOUND);
assert_error_code(&body, "BLOB_UPLOAD_UNKNOWN");
}
#[tokio::test]
async fn get_status_for_unknown_upload_returns_404() {
let app = app();
let (status, _h, body) = send(&app, get("/v2/repo/blobs/uploads/does-not-exist")).await;
assert_eq!(status, StatusCode::NOT_FOUND);
assert_error_code(&body, "BLOB_UPLOAD_UNKNOWN");
}
#[tokio::test]
async fn patch_unknown_upload_returns_404() {
let app = app();
let (status, _h, body) = send(
&app,
method(
Method::PATCH,
"/v2/repo/blobs/uploads/ghost",
Body::from(&b"chunk"[..]),
),
)
.await;
assert_eq!(status, StatusCode::NOT_FOUND);
assert_error_code(&body, "BLOB_UPLOAD_UNKNOWN");
}
#[tokio::test]
async fn patch_out_of_order_content_range_returns_416() {
let app = app();
let (_s, headers, _b) =
send(&app, method(Method::POST, "/v2/repo/blobs/uploads/", Body::empty())).await;
let location = headers[header::LOCATION].to_str().unwrap().to_owned();
let req = Request::builder()
.method(Method::PATCH)
.uri(&location)
.header(header::CONTENT_RANGE, "50-99")
.body(Body::from(&b"data"[..]))
.expect("req");
let (status, _h, body) = send(&app, req).await;
assert_eq!(status, StatusCode::RANGE_NOT_SATISFIABLE);
assert_error_code(&body, "BLOB_UPLOAD_INVALID");
}
#[tokio::test]
async fn patch_malformed_content_range_returns_400() {
let app = app();
let (_s, headers, _b) =
send(&app, method(Method::POST, "/v2/repo/blobs/uploads/", Body::empty())).await;
let location = headers[header::LOCATION].to_str().unwrap().to_owned();
let req = Request::builder()
.method(Method::PATCH)
.uri(&location)
.header(header::CONTENT_RANGE, "garbage")
.body(Body::from(&b"data"[..]))
.expect("req");
let (status, _h, body) = send(&app, req).await;
assert_eq!(status, StatusCode::BAD_REQUEST);
assert_error_code(&body, "BLOB_UPLOAD_INVALID");
}
#[tokio::test]
async fn patch_full_u64_content_range_with_empty_body_rejected_no_panic() {
let app = app();
let (_s, headers, _b) =
send(&app, method(Method::POST, "/v2/repo/blobs/uploads/", Body::empty())).await;
let location = headers[header::LOCATION].to_str().unwrap().to_owned();
let req = Request::builder()
.method(Method::PATCH)
.uri(&location)
.header(header::CONTENT_RANGE, format!("0-{}", u64::MAX))
.body(Body::empty())
.expect("req");
let (status, _h, body) = send(&app, req).await;
assert_eq!(status, StatusCode::RANGE_NOT_SATISFIABLE);
assert_error_code(&body, "BLOB_UPLOAD_INVALID");
}
#[tokio::test]
async fn finish_upload_missing_digest_param_returns_400() {
let app = app();
let (_s, headers, _b) =
send(&app, method(Method::POST, "/v2/repo/blobs/uploads/", Body::empty())).await;
let location = headers[header::LOCATION].to_str().unwrap().to_owned();
let (status, _h, body) =
send(&app, method(Method::PUT, &location, Body::from(&b"final"[..]))).await;
assert_eq!(status, StatusCode::BAD_REQUEST);
assert_error_code(&body, "DIGEST_INVALID");
}
#[tokio::test]
async fn finish_unknown_upload_returns_404() {
let app = app();
let digest = Digest::sha256_of(b"whatever").to_string();
let (status, _h, body) = send(
&app,
method(
Method::PUT,
&format!("/v2/repo/blobs/uploads/ghost?digest={digest}"),
Body::empty(),
),
)
.await;
assert_eq!(status, StatusCode::NOT_FOUND);
assert_error_code(&body, "BLOB_UPLOAD_UNKNOWN");
}
#[tokio::test]
async fn upload_invalid_name_returns_400() {
let app = app();
let (status, _h, body) =
send(&app, method(Method::POST, "/v2/BAD/blobs/uploads/", Body::empty())).await;
assert_eq!(status, StatusCode::BAD_REQUEST);
assert_error_code(&body, "NAME_INVALID");
}
async fn start_session(app: &Router, name: &str) -> String {
let (status, headers, _b) = send(
app,
method(Method::POST, &format!("/v2/{name}/blobs/uploads/"), Body::empty()),
)
.await;
assert_eq!(status, StatusCode::ACCEPTED, "start upload session");
headers[header::LOCATION].to_str().unwrap().to_owned()
}
#[tokio::test]
async fn monolithic_upload_matching_digest_is_created() {
let app = app();
let payload = b"correct-monolithic-bytes";
let digest = Digest::sha256_of(payload).to_string();
let (status, _h, _b) = send(
&app,
method(
Method::POST,
&format!("/v2/repo/blobs/uploads/?digest={digest}"),
Body::from(&payload[..]),
),
)
.await;
assert_eq!(status, StatusCode::CREATED, "correct digest ⇒ 201");
let (status, _h, body) = send(&app, get(&format!("/v2/repo/blobs/{digest}"))).await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body, payload, "blob round-trips byte-for-byte");
}
#[tokio::test]
async fn chunked_finish_appends_final_chunk_body() {
let app = app();
let location = start_session(&app, "repo").await;
let payload = b"finalized-in-one-put";
let digest = Digest::sha256_of(payload).to_string();
let (status, _h, _b) = send(
&app,
method(
Method::PUT,
&format!("{location}?digest={digest}"),
Body::from(&payload[..]),
),
)
.await;
assert_eq!(status, StatusCode::CREATED, "final-chunk PUT ⇒ 201");
let (status, _h, body) = send(&app, get(&format!("/v2/repo/blobs/{digest}"))).await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body, payload, "final-chunk bytes were appended and stored");
}
#[tokio::test]
async fn chunked_finish_with_wrong_digest_returns_400() {
let app = app();
let location = start_session(&app, "repo").await;
let wrong = Digest::sha256_of(b"some-other-content").to_string();
let (status, _h, body) = send(
&app,
method(
Method::PUT,
&format!("{location}?digest={wrong}"),
Body::from(&b"actually-these-bytes"[..]),
),
)
.await;
assert_eq!(status, StatusCode::BAD_REQUEST, "wrong digest ⇒ 400");
assert_error_code(&body, "DIGEST_INVALID");
}
#[tokio::test]
async fn chunked_finish_increments_blob_gauge_for_new_blob() {
let blob_store: SharedBlobStore = Arc::new(InMemoryBlobStore::new());
let registry = Arc::new(InMemoryRegistryMeta::new());
let state = AppState::new(blob_store, registry);
let blob_count = state.blob_count_handle();
let app = ferro_oci_server::instrument(router(state), ferro_oci_server::Metrics::new(), blob_count);
let location = start_session(&app, "repo").await;
let payload = b"freshly-chunk-finished-blob";
let digest = Digest::sha256_of(payload).to_string();
let (status, _h, _b) = send(
&app,
method(
Method::PUT,
&format!("{location}?digest={digest}"),
Body::from(&payload[..]),
),
)
.await;
assert_eq!(status, StatusCode::CREATED);
let (_s, _h, body) = send(&app, get("/metrics")).await;
let text = String::from_utf8(body).expect("utf8");
let counted = text
.lines()
.any(|l| l == "ferrooci_storage_blobs 1");
assert!(
counted,
"a new blob finished via chunked PUT must increment the gauge to 1:\n{text}"
);
}
#[tokio::test]
async fn catalog_empty_is_ok_with_no_repositories() {
let app = app();
let (status, _h, body) = send(&app, get("/v2/_catalog")).await;
assert_eq!(status, StatusCode::OK);
let v: Value = serde_json::from_slice(&body).expect("json");
assert_eq!(v["repositories"].as_array().expect("arr").len(), 0);
}
#[tokio::test]
async fn catalog_pagination_emits_link_header() {
let app = app();
for repo in ["a-repo", "b-repo", "c-repo"] {
push_min_manifest(&app, repo, "latest").await;
}
let (status, headers, body) = send(&app, get("/v2/_catalog?n=1")).await;
assert_eq!(status, StatusCode::OK);
let v: Value = serde_json::from_slice(&body).expect("json");
assert_eq!(v["repositories"].as_array().unwrap().len(), 1);
assert!(
headers.contains_key(header::LINK),
"n=1 over 3 repos must emit a next Link"
);
}
#[tokio::test]
async fn tags_list_pagination_emits_link_header() {
let app = app();
for tag in ["v1", "v2", "v3"] {
push_min_manifest(&app, "tagrepo", tag).await;
}
let (status, headers, body) = send(&app, get("/v2/tagrepo/tags/list?n=1")).await;
assert_eq!(status, StatusCode::OK);
let v: Value = serde_json::from_slice(&body).expect("json");
assert_eq!(v["name"], "tagrepo");
assert_eq!(v["tags"].as_array().unwrap().len(), 1);
assert!(headers.contains_key(header::LINK));
}
#[tokio::test]
async fn tags_list_invalid_name_returns_400() {
let app = app();
let (status, _h, body) = send(&app, get("/v2/BAD/tags/list")).await;
assert_eq!(status, StatusCode::BAD_REQUEST);
assert_error_code(&body, "NAME_INVALID");
}
async fn push_min_manifest(app: &Router, name: &str, reference: &str) -> Digest {
let manifest = json!({
"schemaVersion": 2,
"mediaType": "application/vnd.oci.image.manifest.v1+json",
"config": {
"mediaType": "application/vnd.oci.image.config.v1+json",
"digest": EMPTY_DIGEST,
"size": 2
},
"layers": []
});
let body = serde_json::to_vec(&manifest).expect("ser");
let digest = Digest::sha256_of(&body);
let req = Request::builder()
.method(Method::PUT)
.uri(format!("/v2/{name}/manifests/{reference}"))
.header(
header::CONTENT_TYPE,
"application/vnd.oci.image.manifest.v1+json",
)
.body(Body::from(body))
.expect("req");
let (status, _h, _b) = send(app, req).await;
assert_eq!(status, StatusCode::CREATED, "manifest push for {name}:{reference}");
digest
}
#[tokio::test]
async fn manifest_put_missing_content_type_returns_400() {
let app = app();
let req = Request::builder()
.method(Method::PUT)
.uri("/v2/repo/manifests/latest")
.body(Body::from(&b"{}"[..]))
.expect("req");
let (status, _h, body) = send(&app, req).await;
assert_eq!(status, StatusCode::BAD_REQUEST);
assert_error_code(&body, "MANIFEST_INVALID");
}
#[tokio::test]
async fn manifest_put_unsupported_media_type_returns_400() {
let app = app();
let req = Request::builder()
.method(Method::PUT)
.uri("/v2/repo/manifests/latest")
.header(header::CONTENT_TYPE, "text/plain")
.body(Body::from(&b"{}"[..]))
.expect("req");
let (status, _h, body) = send(&app, req).await;
assert_eq!(status, StatusCode::BAD_REQUEST);
assert_error_code(&body, "MANIFEST_INVALID");
}
#[tokio::test]
async fn manifest_put_invalid_json_returns_400() {
let app = app();
let req = Request::builder()
.method(Method::PUT)
.uri("/v2/repo/manifests/latest")
.header(
header::CONTENT_TYPE,
"application/vnd.oci.image.manifest.v1+json",
)
.body(Body::from(&b"{not json"[..]))
.expect("req");
let (status, _h, body) = send(&app, req).await;
assert_eq!(status, StatusCode::BAD_REQUEST);
assert_error_code(&body, "MANIFEST_INVALID");
}
#[tokio::test]
async fn manifest_put_missing_referenced_layer_blob_returns_404() {
let app = app();
let missing = Digest::sha256_of(b"never-uploaded-layer").to_string();
let manifest = json!({
"schemaVersion": 2,
"mediaType": "application/vnd.oci.image.manifest.v1+json",
"config": { "mediaType": "application/vnd.oci.image.config.v1+json", "digest": EMPTY_DIGEST, "size": 2 },
"layers": [ { "mediaType": "application/vnd.oci.image.layer.v1.tar", "digest": missing, "size": 10 } ]
});
let req = Request::builder()
.method(Method::PUT)
.uri("/v2/repo/manifests/latest")
.header(
header::CONTENT_TYPE,
"application/vnd.oci.image.manifest.v1+json",
)
.body(Body::from(serde_json::to_vec(&manifest).unwrap()))
.expect("req");
let (status, _h, body) = send(&app, req).await;
assert_eq!(status, StatusCode::NOT_FOUND);
assert_error_code(&body, "MANIFEST_BLOB_UNKNOWN");
}
#[tokio::test]
async fn image_index_missing_child_returns_404() {
let app = app();
let missing = Digest::sha256_of(b"absent-child-manifest").to_string();
let index = json!({
"schemaVersion": 2,
"mediaType": "application/vnd.oci.image.index.v1+json",
"manifests": [ { "mediaType": "application/vnd.oci.image.manifest.v1+json", "digest": missing, "size": 5 } ]
});
let req = Request::builder()
.method(Method::PUT)
.uri("/v2/repo/manifests/idx")
.header(
header::CONTENT_TYPE,
"application/vnd.oci.image.index.v1+json",
)
.body(Body::from(serde_json::to_vec(&index).unwrap()))
.expect("req");
let (status, _h, body) = send(&app, req).await;
assert_eq!(status, StatusCode::NOT_FOUND);
assert_error_code(&body, "MANIFEST_BLOB_UNKNOWN");
}
#[tokio::test]
async fn manifest_delete_by_tag_returns_405() {
let app = app();
push_min_manifest(&app, "repo", "latest").await;
let (status, _h, body) = send(
&app,
method(Method::DELETE, "/v2/repo/manifests/latest", Body::empty()),
)
.await;
assert_eq!(status, StatusCode::METHOD_NOT_ALLOWED);
assert_error_code(&body, "UNSUPPORTED");
}
#[tokio::test]
async fn manifest_delete_by_digest_then_missing() {
let app = app();
let digest = push_min_manifest(&app, "repo", "latest").await;
let digest_str = digest.to_string();
let (status, _h, _b) = send(
&app,
method(Method::DELETE, &format!("/v2/repo/manifests/{digest_str}"), Body::empty()),
)
.await;
assert_eq!(status, StatusCode::ACCEPTED);
let (status, _h, body) = send(
&app,
method(Method::DELETE, &format!("/v2/repo/manifests/{digest_str}"), Body::empty()),
)
.await;
assert_eq!(status, StatusCode::NOT_FOUND);
assert_error_code(&body, "MANIFEST_UNKNOWN");
}
#[tokio::test]
async fn manifest_delete_invalid_reference_returns_400() {
let app = app();
let (status, _h, _b) = send(
&app,
method(
Method::DELETE,
"/v2/repo/manifests/sha256:ZZZZ",
Body::empty(),
),
)
.await;
assert_eq!(status, StatusCode::BAD_REQUEST);
}
#[tokio::test]
async fn manifest_get_unparseable_range_serves_full_body_200() {
let app = app();
push_min_manifest(&app, "repo", "latest").await;
let req = Request::builder()
.method(Method::GET)
.uri("/v2/repo/manifests/latest")
.header(header::RANGE, "items=0-5")
.body(Body::empty())
.expect("req");
let (status, headers, body) = send(&app, req).await;
assert_eq!(status, StatusCode::OK);
assert!(!body.is_empty());
assert_eq!(headers[header::ACCEPT_RANGES], "bytes");
}
#[tokio::test]
async fn manifest_get_inverted_range_returns_416() {
let app = app();
push_min_manifest(&app, "repo", "latest").await;
let req = Request::builder()
.method(Method::GET)
.uri("/v2/repo/manifests/latest")
.header(header::RANGE, "bytes=80-10")
.body(Body::empty())
.expect("req");
let (status, _h, _b) = send(&app, req).await;
assert_eq!(status, StatusCode::RANGE_NOT_SATISFIABLE);
}
#[tokio::test]
async fn tags_list_with_post_is_405() {
let app = app();
let (status, _h, body) = send(
&app,
method(Method::POST, "/v2/repo/tags/list", Body::empty()),
)
.await;
assert_eq!(status, StatusCode::METHOD_NOT_ALLOWED);
assert_error_code(&body, "UNSUPPORTED");
}
#[tokio::test]
async fn unroutable_v2_suffix_returns_name_unknown() {
let app = app();
let (status, _h, body) = send(&app, get("/v2/repo/bogus/segment")).await;
assert_eq!(status, StatusCode::NOT_FOUND);
assert_error_code(&body, "NAME_UNKNOWN");
}
#[tokio::test]
async fn referrers_with_post_is_405() {
let app = app();
let (status, _h, _b) = send(
&app,
method(
Method::POST,
&format!("/v2/repo/referrers/{EMPTY_DIGEST}"),
Body::empty(),
),
)
.await;
assert_eq!(status, StatusCode::METHOD_NOT_ALLOWED);
}
#[tokio::test]
async fn probe_routes_live_ready_healthz() {
let app = app();
for uri in ["/live", "/ready", "/healthz"] {
let (status, _h, _b) = send(&app, get(uri)).await;
assert_eq!(status, StatusCode::OK, "GET {uri}");
}
}
#[tokio::test]
async fn manifest_put_by_digest_mismatch_is_rejected() {
let app = app();
let manifest = json!({
"schemaVersion": 2,
"mediaType": "application/vnd.oci.image.manifest.v1+json",
"config": { "mediaType": "application/vnd.oci.image.config.v1+json", "digest": EMPTY_DIGEST, "size": 2 },
"layers": []
});
let body = serde_json::to_vec(&manifest).expect("ser");
let wrong_digest = Digest::sha256_of(b"a completely different payload").to_string();
let req = Request::builder()
.method(Method::PUT)
.uri(format!("/v2/repo/manifests/{wrong_digest}"))
.header(
header::CONTENT_TYPE,
"application/vnd.oci.image.manifest.v1+json",
)
.body(Body::from(body))
.expect("req");
let (status, _h, body) = send(&app, req).await;
assert_eq!(status, StatusCode::BAD_REQUEST, "mismatched by-digest PUT");
assert_error_code(&body, "DIGEST_INVALID");
}
#[tokio::test]
async fn manifest_put_by_non_sha256_digest_reference_is_rejected() {
let app = app();
let manifest = json!({
"schemaVersion": 2,
"mediaType": "application/vnd.oci.image.manifest.v1+json",
"config": { "mediaType": "application/vnd.oci.image.config.v1+json", "digest": EMPTY_DIGEST, "size": 2 },
"layers": []
});
let body = serde_json::to_vec(&manifest).expect("ser");
let sha512_ref = format!("sha512:{}", "a".repeat(128));
let req = Request::builder()
.method(Method::PUT)
.uri(format!("/v2/repo/manifests/{sha512_ref}"))
.header(
header::CONTENT_TYPE,
"application/vnd.oci.image.manifest.v1+json",
)
.body(Body::from(body))
.expect("req");
let (status, _h, body) = send(&app, req).await;
assert_eq!(
status,
StatusCode::BAD_REQUEST,
"non-sha256 by-digest PUT must be rejected, not 201"
);
assert_error_code(&body, "DIGEST_INVALID");
}
#[tokio::test]
async fn manifest_put_by_matching_digest_succeeds() {
let app = app();
let manifest = json!({
"schemaVersion": 2,
"mediaType": "application/vnd.oci.image.manifest.v1+json",
"config": { "mediaType": "application/vnd.oci.image.config.v1+json", "digest": EMPTY_DIGEST, "size": 2 },
"layers": []
});
let body = serde_json::to_vec(&manifest).expect("ser");
let digest = Digest::sha256_of(&body).to_string();
let req = Request::builder()
.method(Method::PUT)
.uri(format!("/v2/repo/manifests/{digest}"))
.header(
header::CONTENT_TYPE,
"application/vnd.oci.image.manifest.v1+json",
)
.body(Body::from(body))
.expect("req");
let (status, headers, _b) = send(&app, req).await;
assert_eq!(status, StatusCode::CREATED, "matching by-digest PUT");
assert_eq!(headers["docker-content-digest"], digest);
}
fn app_with_upload_cap(cap: u64) -> Router {
let blob_store: SharedBlobStore = Arc::new(InMemoryBlobStore::new());
let registry = Arc::new(InMemoryRegistryMeta::new());
let state = AppState::with_max_upload_session_bytes(blob_store, registry, cap);
router(state).merge(probe_routes())
}
#[tokio::test]
async fn chunked_upload_past_session_cap_is_rejected_and_buffer_dropped() {
let app = app_with_upload_cap(8);
let (_s, headers, _b) =
send(&app, method(Method::POST, "/v2/repo/blobs/uploads/", Body::empty())).await;
let location = headers[header::LOCATION].to_str().unwrap().to_owned();
let req = Request::builder()
.method(Method::PATCH)
.uri(&location)
.header(header::CONTENT_RANGE, "0-4")
.body(Body::from(&b"hello"[..]))
.expect("req");
let (status, _h, _b) = send(&app, req).await;
assert_eq!(status, StatusCode::ACCEPTED, "first sub-cap chunk");
let req = Request::builder()
.method(Method::PATCH)
.uri(&location)
.header(header::CONTENT_RANGE, "5-9")
.body(Body::from(&b"world"[..]))
.expect("req");
let (status, _h, body) = send(&app, req).await;
assert_eq!(
status,
StatusCode::PAYLOAD_TOO_LARGE,
"over-cap chunk rejected"
);
assert_error_code(&body, "BLOB_UPLOAD_INVALID");
let (status, _h, body) = send(&app, get(&location)).await;
assert_eq!(status, StatusCode::NOT_FOUND, "session dropped after overflow");
assert_error_code(&body, "BLOB_UPLOAD_UNKNOWN");
assert_eq!(
ferro_oci_server::MAX_UPLOAD_SESSION_BYTES,
4 * 1024 * 1024 * 1024
);
}
#[tokio::test]
async fn opening_more_sessions_than_cap_returns_429() {
use ferro_oci_server::SessionLimits;
let blob_store: SharedBlobStore = Arc::new(InMemoryBlobStore::new());
let registry = Arc::new(ferro_oci_server::InMemoryRegistryMeta::with_session_limits(
SessionLimits {
max_sessions: 2,
idle_ttl: std::time::Duration::from_secs(3600),
},
));
let state = AppState::new(blob_store, registry);
let app = router(state).merge(probe_routes());
for _ in 0..2 {
let (status, _h, _b) =
send(&app, method(Method::POST, "/v2/repo/blobs/uploads/", Body::empty())).await;
assert_eq!(status, StatusCode::ACCEPTED, "session under the cap");
}
let (status, _h, body) =
send(&app, method(Method::POST, "/v2/repo/blobs/uploads/", Body::empty())).await;
assert_eq!(
status,
StatusCode::TOO_MANY_REQUESTS,
"session over the cap must be rejected with 429"
);
assert_error_code(&body, "TOOMANYREQUESTS");
}
#[tokio::test]
async fn idle_session_is_evicted_and_reads_as_unknown() {
use ferro_oci_server::SessionLimits;
let blob_store: SharedBlobStore = Arc::new(InMemoryBlobStore::new());
let registry = Arc::new(ferro_oci_server::InMemoryRegistryMeta::with_session_limits(
SessionLimits {
max_sessions: 16,
idle_ttl: std::time::Duration::from_secs(0),
},
));
let state = AppState::new(blob_store, registry);
let app = router(state).merge(probe_routes());
let (status, headers, _b) =
send(&app, method(Method::POST, "/v2/repo/blobs/uploads/", Body::empty())).await;
assert_eq!(status, StatusCode::ACCEPTED);
let location = headers[header::LOCATION].to_str().unwrap().to_owned();
let (status, _h, body) = send(&app, get(&location)).await;
assert_eq!(status, StatusCode::NOT_FOUND, "expired session evicted");
assert_error_code(&body, "BLOB_UPLOAD_UNKNOWN");
}
#[tokio::test]
async fn manifest_put_3mib_succeeds_past_axum_default_limit() {
let app = app();
let big = "x".repeat(3 * 1024 * 1024);
let manifest = json!({
"schemaVersion": 2,
"mediaType": "application/vnd.oci.image.manifest.v1+json",
"config": { "mediaType": "application/vnd.oci.image.config.v1+json", "digest": EMPTY_DIGEST, "size": 2 },
"layers": [],
"annotations": { "com.example.padding": big }
});
let body = serde_json::to_vec(&manifest).expect("ser");
assert!(body.len() > 2 * 1024 * 1024, "manifest exceeds 2 MiB");
let req = Request::builder()
.method(Method::PUT)
.uri("/v2/repo/manifests/latest")
.header(
header::CONTENT_TYPE,
"application/vnd.oci.image.manifest.v1+json",
)
.body(Body::from(body))
.expect("req");
let (status, _h, _b) = send(&app, req).await;
assert_eq!(
status,
StatusCode::CREATED,
"3 MiB manifest accepted past the 2 MiB default"
);
}
#[tokio::test]
async fn patch_content_range_length_mismatch_is_rejected() {
let app = app();
let (_s, headers, _b) =
send(&app, method(Method::POST, "/v2/repo/blobs/uploads/", Body::empty())).await;
let location = headers[header::LOCATION].to_str().unwrap().to_owned();
let req = Request::builder()
.method(Method::PATCH)
.uri(&location)
.header(header::CONTENT_RANGE, "0-999999")
.body(Body::from(&b"x"[..]))
.expect("req");
let (status, _h, body) = send(&app, req).await;
assert_eq!(
status,
StatusCode::RANGE_NOT_SATISFIABLE,
"range/body length mismatch"
);
assert_error_code(&body, "BLOB_UPLOAD_INVALID");
}
#[tokio::test]
async fn patch_content_range_length_match_is_accepted() {
let app = app();
let (_s, headers, _b) =
send(&app, method(Method::POST, "/v2/repo/blobs/uploads/", Body::empty())).await;
let location = headers[header::LOCATION].to_str().unwrap().to_owned();
let req = Request::builder()
.method(Method::PATCH)
.uri(&location)
.header(header::CONTENT_RANGE, "0-4")
.body(Body::from(&b"hello"[..]))
.expect("req");
let (status, _h, _b) = send(&app, req).await;
assert_eq!(status, StatusCode::ACCEPTED, "matching range/body length");
}
fn app_with_failing_persist() -> (Router, tempfile::TempDir) {
let tmp = tempfile::TempDir::new().expect("tempdir");
let meta = tmp.path().join(ferro_oci_server::METADATA_FILE_NAME);
std::fs::create_dir(&meta).expect("metadata.json as dir");
std::fs::write(meta.join("child"), b"x").expect("child");
let blob_store: SharedBlobStore = Arc::new(InMemoryBlobStore::new());
let registry = Arc::new(InMemoryRegistryMeta::with_persistence(tmp.path()));
let state = AppState::new(blob_store, registry);
(router(state).merge(probe_routes()), tmp)
}
#[tokio::test]
async fn manifest_put_with_subject_persist_failure_rolls_back_all_and_returns_500() {
let (app, _tmp) = app_with_failing_persist();
let subject_digest = Digest::sha256_of(b"the-subject-manifest").to_string();
let manifest = json!({
"schemaVersion": 2,
"mediaType": "application/vnd.oci.image.manifest.v1+json",
"artifactType": "application/spdx+json",
"config": {
"mediaType": "application/vnd.oci.image.config.v1+json",
"digest": EMPTY_DIGEST,
"size": 2
},
"layers": [],
"subject": {
"mediaType": "application/vnd.oci.image.manifest.v1+json",
"digest": subject_digest,
"size": 7
}
});
let body = serde_json::to_vec(&manifest).expect("ser");
let digest = Digest::sha256_of(&body).to_string();
let req = Request::builder()
.method(Method::PUT)
.uri("/v2/lib/withsub/manifests/latest")
.header(
header::CONTENT_TYPE,
"application/vnd.oci.image.manifest.v1+json",
)
.body(Body::from(body))
.expect("req");
let (status, _h, _b) = send(&app, req).await;
assert_eq!(
status,
StatusCode::INTERNAL_SERVER_ERROR,
"persist failure on a subject PUT must be 500"
);
let (tag_status, _h, _b) = send(&app, get("/v2/lib/withsub/manifests/latest")).await;
assert_eq!(
tag_status,
StatusCode::NOT_FOUND,
"rolled-back manifest must not resolve by tag"
);
let (digest_status, _h, _b) =
send(&app, get(&format!("/v2/lib/withsub/manifests/{digest}"))).await;
assert_eq!(
digest_status,
StatusCode::NOT_FOUND,
"rolled-back manifest must not resolve by digest"
);
let (ref_status, _h, ref_body) =
send(&app, get(&format!("/v2/lib/withsub/referrers/{subject_digest}"))).await;
assert_eq!(ref_status, StatusCode::OK, "referrers list is always 200");
let v: Value = serde_json::from_slice(&ref_body).expect("referrers json");
assert_eq!(
v["manifests"].as_array().map_or(0, Vec::len),
0,
"no referrer may survive the rolled-back PUT: {v}"
);
let (_cs, _h, cat_body) = send(&app, get("/v2/_catalog")).await;
let cat: Value = serde_json::from_slice(&cat_body).expect("catalog json");
assert_eq!(
cat["repositories"].as_array().map_or(0, Vec::len),
0,
"failed first publish must leave catalog empty: {cat}"
);
}
#[tokio::test]
async fn manifest_put_with_subject_success_registers_manifest_and_referrer() {
let app = app();
let subject_digest = Digest::sha256_of(b"the-subject-manifest").to_string();
let manifest = json!({
"schemaVersion": 2,
"mediaType": "application/vnd.oci.image.manifest.v1+json",
"artifactType": "application/spdx+json",
"config": {
"mediaType": "application/vnd.oci.image.config.v1+json",
"digest": EMPTY_DIGEST,
"size": 2
},
"layers": [],
"subject": {
"mediaType": "application/vnd.oci.image.manifest.v1+json",
"digest": subject_digest,
"size": 7
}
});
let body = serde_json::to_vec(&manifest).expect("ser");
let digest = Digest::sha256_of(&body).to_string();
let req = Request::builder()
.method(Method::PUT)
.uri("/v2/lib/withsub/manifests/latest")
.header(
header::CONTENT_TYPE,
"application/vnd.oci.image.manifest.v1+json",
)
.body(Body::from(body))
.expect("req");
let (status, headers, _b) = send(&app, req).await;
assert_eq!(status, StatusCode::CREATED, "subject PUT ⇒ 201");
assert_eq!(
headers["oci-subject"], subject_digest,
"OCI-Subject header surfaces the subject"
);
let (tag_status, _h, _b) = send(&app, get("/v2/lib/withsub/manifests/latest")).await;
assert_eq!(tag_status, StatusCode::OK, "manifest resolves by tag");
let (ref_status, _h, ref_body) =
send(&app, get(&format!("/v2/lib/withsub/referrers/{subject_digest}"))).await;
assert_eq!(ref_status, StatusCode::OK);
let v: Value = serde_json::from_slice(&ref_body).expect("referrers json");
let list = v["manifests"].as_array().expect("manifests array");
assert_eq!(list.len(), 1, "exactly one referrer registered: {v}");
assert_eq!(list[0]["digest"], digest, "referrer points at the PUT manifest");
}