#![doc = include_str!("../README.md")]
pub mod auth;
pub mod hooks;
pub mod storage;
#[cfg(any(feature = "test-support", test))]
pub mod test_support;
#[cfg(test)]
mod tests;
mod types;
mod www_authenticate;
use std::{
fmt::{self, Display},
io,
path::PathBuf,
str::FromStr,
sync::Arc,
};
use self::{
auth::ValidCredentials,
storage::{FilesystemStorage, ImageLocation, RegistryStorage},
types::{ImageManifest, OciError, OciErrors},
};
use auth::{MissingPermission, Permissions};
use axum::{
body::Body,
extract::{Path, Query, State},
http::{
header::{CONTENT_LENGTH, CONTENT_TYPE, LOCATION, RANGE},
StatusCode,
},
response::{IntoResponse, Response},
routing::{get, head, patch, post, put},
Router,
};
use futures::stream::StreamExt;
use hex::FromHex;
use serde::{Deserialize, Deserializer, Serialize};
use storage::Reference;
use thiserror::Error;
use tokio::io::AsyncWriteExt;
use tokio_util::io::ReaderStream;
use tracing::info;
use uuid::Uuid;
pub(crate) use {
auth::{AuthProvider, Unverified},
hooks::RegistryHooks,
storage::{FilesystemStorageError, ManifestReference},
};
#[derive(Debug, Error)]
pub enum RegistryError {
#[error("missing item")]
NotFound,
#[error("permission denied")]
PermissionDenied(#[from] MissingPermission),
#[error(transparent)]
Storage(#[from] storage::Error),
#[error("could not parse manifest")]
ParseManifest(serde_json::Error),
#[error("feature not supported: {0}")]
NotSupported(&'static str),
#[error("error parsing content length")]
ContentLengthMalformed(#[source] Box<dyn std::error::Error + Send + Sync>),
#[error("failed to read incoming data stream")]
IncomingReadFailed(#[source] axum::Error),
#[error("local write failed")]
LocalWriteFailed(#[source] io::Error),
#[error("axum http error")]
AxumHttp(#[from] axum::http::Error),
}
impl IntoResponse for RegistryError {
#[inline(always)]
fn into_response(self) -> Response {
match self {
RegistryError::NotFound => (
StatusCode::NOT_FOUND,
OciErrors::single(OciError::new(types::ErrorCode::BlobUnknown)),
)
.into_response(),
RegistryError::PermissionDenied(_) => (
StatusCode::FORBIDDEN,
"access to request resource was denied",
)
.into_response(),
RegistryError::Storage(err) => err.into_response(),
RegistryError::ParseManifest(err) => (
StatusCode::BAD_REQUEST,
format!("could not parse manifest: {}", err),
)
.into_response(),
RegistryError::NotSupported(feature) => (
StatusCode::INTERNAL_SERVER_ERROR,
format!("feature not supported: {}", feature),
)
.into_response(),
RegistryError::ContentLengthMalformed(err) => (
StatusCode::BAD_REQUEST,
format!("invalid content length value: {}", err),
)
.into_response(),
RegistryError::IncomingReadFailed(_err) => (
StatusCode::INTERNAL_SERVER_ERROR,
"could not read input stream",
)
.into_response(),
RegistryError::LocalWriteFailed(_err) => (
StatusCode::INTERNAL_SERVER_ERROR,
"could not write image locally",
)
.into_response(),
RegistryError::AxumHttp(_err) => (
StatusCode::INTERNAL_SERVER_ERROR,
"error building axum HTTP response",
)
.into_response(),
}
}
}
pub struct ContainerRegistry {
realm: String,
auth_provider: Arc<dyn AuthProvider>,
storage: Box<dyn RegistryStorage>,
hooks: Box<dyn RegistryHooks>,
}
impl ContainerRegistry {
pub fn builder() -> ContainerRegistryBuilder {
ContainerRegistryBuilder::default()
}
pub fn make_router(self: Arc<ContainerRegistry>) -> Router {
Router::new()
.route("/v2/", get(index_v2))
.route("/v2/:repository/:image/blobs/:digest", head(blob_check))
.route("/v2/:repository/:image/blobs/:digest", get(blob_get))
.route("/v2/:repository/:image/blobs/uploads/", post(upload_new))
.route(
"/v2/:repository/:image/uploads/:upload",
patch(upload_add_chunk),
)
.route(
"/v2/:repository/:image/uploads/:upload",
put(upload_finalize),
)
.route(
"/v2/:repository/:image/manifests/:reference",
put(manifest_put),
)
.route(
"/v2/:repository/:image/manifests/:reference",
get(manifest_get),
)
.with_state(self)
}
}
#[derive(Default)]
pub struct ContainerRegistryBuilder {
storage: Option<PathBuf>,
hooks: Option<Box<dyn RegistryHooks>>,
auth_provider: Option<Arc<dyn AuthProvider>>,
}
impl ContainerRegistryBuilder {
pub fn auth_provider(mut self, auth_provider: Arc<dyn AuthProvider>) -> Self {
self.auth_provider = Some(auth_provider);
self
}
pub fn hooks(mut self, hooks: Box<dyn RegistryHooks>) -> Self {
self.hooks = Some(hooks);
self
}
pub fn storage<P>(mut self, storage: P) -> Self
where
P: Into<PathBuf>,
{
self.storage = Some(storage.into());
self
}
pub fn build(mut self) -> Result<Arc<ContainerRegistry>, FilesystemStorageError> {
let storage_path = self
.storage
.expect("attempted to construct registry with no storage path");
let storage = Box::new(FilesystemStorage::new(storage_path)?);
let auth_provider = self
.auth_provider
.take()
.unwrap_or_else(|| Arc::new(Permissions::NoAccess));
let hooks = self.hooks.take().unwrap_or_else(|| Box::new(()));
Ok(Arc::new(ContainerRegistry {
realm: "ContainerRegistry".to_string(),
auth_provider,
storage,
hooks,
}))
}
}
async fn index_v2(
State(registry): State<Arc<ContainerRegistry>>,
unverified: Unverified,
) -> Response<Body> {
let realm = ®istry.realm;
if registry
.auth_provider
.check_credentials(&unverified)
.await
.is_some()
{
return Response::builder()
.status(StatusCode::OK)
.header("WWW-Authenticate", format!("Basic realm=\"{realm}\""))
.body(Body::empty())
.unwrap();
}
Response::builder()
.status(StatusCode::UNAUTHORIZED)
.header("WWW-Authenticate", format!("Basic realm=\"{realm}\""))
.body(Body::empty())
.unwrap()
}
async fn blob_check(
State(registry): State<Arc<ContainerRegistry>>,
Path((_, _, image)): Path<(String, String, ImageDigest)>,
creds: ValidCredentials,
) -> Result<Response, RegistryError> {
registry
.auth_provider
.blob_permissions(&creds, &image)
.await
.require_read()?;
if let Some(metadata) = registry.storage.get_blob_metadata(image.digest).await? {
Ok(Response::builder()
.status(StatusCode::OK)
.header(CONTENT_LENGTH, metadata.size())
.header("Docker-Content-Digest", image.to_string())
.header(CONTENT_TYPE, "application/octet-stream")
.body(Body::empty())
.unwrap())
} else {
Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::empty())
.unwrap())
}
}
async fn blob_get(
State(registry): State<Arc<ContainerRegistry>>,
Path((_, _, image)): Path<(String, String, ImageDigest)>,
creds: ValidCredentials,
) -> Result<Response, RegistryError> {
registry
.auth_provider
.blob_permissions(&creds, &image)
.await
.require_read()?;
let reader = registry
.storage
.get_blob_reader(image.digest)
.await?
.ok_or(RegistryError::NotFound)?;
let stream = ReaderStream::new(reader);
let body = Body::from_stream(stream);
Ok(Response::builder()
.status(StatusCode::OK)
.body(body)
.expect("Building a streaming response with body works. qed"))
}
async fn upload_new(
State(registry): State<Arc<ContainerRegistry>>,
Path(location): Path<ImageLocation>,
creds: ValidCredentials,
) -> Result<UploadState, RegistryError> {
registry
.auth_provider
.image_permissions(&creds, &location)
.await
.require_write()?;
let upload = registry.storage.begin_new_upload().await?;
Ok(UploadState {
location,
completed: None,
upload,
})
}
fn mk_upload_location(location: &ImageLocation, uuid: Uuid) -> String {
let repository = &location.repository();
let image = &location.image();
format!("/v2/{repository}/{image}/uploads/{uuid}")
}
fn mk_manifest_location(location: &ImageLocation, reference: &Reference) -> String {
let repository = &location.repository();
let image = &location.image();
format!("/v2/{repository}/{image}/manifests/{reference}")
}
#[derive(Debug)]
struct UploadState {
location: ImageLocation,
completed: Option<u64>,
upload: Uuid,
}
impl IntoResponse for UploadState {
fn into_response(self) -> Response {
let mut builder = Response::builder()
.header(LOCATION, mk_upload_location(&self.location, self.upload))
.header(CONTENT_LENGTH, 0)
.header("Docker-Upload-UUID", self.upload.to_string());
if let Some(completed) = self.completed {
builder = builder
.header(RANGE, format!("0-{}", completed))
.status(StatusCode::ACCEPTED)
} else {
builder = builder
.header(CONTENT_LENGTH, 0)
.status(StatusCode::ACCEPTED);
}
builder.body(Body::empty()).unwrap()
}
}
#[derive(Copy, Clone, Debug, Deserialize)]
struct UploadId {
upload: Uuid,
}
#[derive(Debug)]
pub struct ImageDigest {
digest: storage::Digest,
}
impl Serialize for ImageDigest {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let full = format!("sha256:{}", self.digest);
full.serialize(serializer)
}
}
impl<'de> Deserialize<'de> for ImageDigest {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let raw = <String>::deserialize(deserializer)?;
raw.parse().map_err(serde::de::Error::custom)
}
}
impl ImageDigest {
#[inline(always)]
pub const fn new(digest: storage::Digest) -> Self {
Self { digest }
}
pub fn digest(&self) -> storage::Digest {
self.digest
}
}
#[derive(Debug, Error)]
pub enum ImageDigestParseError {
#[error("wrong length")]
WrongLength,
#[error("wrong prefix")]
WrongPrefix,
#[error("hex decoding error")]
HexDecodeError,
}
impl FromStr for ImageDigest {
type Err = ImageDigestParseError;
fn from_str(raw: &str) -> Result<Self, Self::Err> {
const SHA256_LEN: usize = 32;
const PREFIX_LEN: usize = 7;
const DIGEST_HEX_LEN: usize = SHA256_LEN * 2;
if raw.len() != PREFIX_LEN + DIGEST_HEX_LEN {
return Err(ImageDigestParseError::WrongLength);
}
if !raw.starts_with("sha256:") {
return Err(ImageDigestParseError::WrongPrefix);
}
let hex_encoded = &raw[PREFIX_LEN..];
debug_assert_eq!(hex_encoded.len(), DIGEST_HEX_LEN);
let digest = <[u8; SHA256_LEN]>::from_hex(hex_encoded)
.map_err(|_| ImageDigestParseError::HexDecodeError)?;
Ok(Self {
digest: storage::Digest::new(digest),
})
}
}
impl Display for ImageDigest {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "sha256:{}", self.digest)
}
}
async fn upload_add_chunk(
State(registry): State<Arc<ContainerRegistry>>,
Path(location): Path<ImageLocation>,
Path(UploadId { upload }): Path<UploadId>,
creds: ValidCredentials,
request: axum::extract::Request,
) -> Result<UploadState, RegistryError> {
registry
.auth_provider
.image_permissions(&creds, &location)
.await
.require_write()?;
if request.headers().contains_key(RANGE) {
return Err(RegistryError::NotSupported(
"unsupported feature: chunked uploads",
));
}
let mut writer = registry.storage.get_upload_writer(0, upload).await?;
let mut body = request.into_body().into_data_stream();
let mut completed: u64 = 0;
while let Some(result) = body.next().await {
let chunk = result.map_err(RegistryError::IncomingReadFailed)?;
completed += chunk.len() as u64;
writer
.write_all(chunk.as_ref())
.await
.map_err(RegistryError::LocalWriteFailed)?;
}
writer
.flush()
.await
.map_err(RegistryError::LocalWriteFailed)?;
Ok(UploadState {
location,
completed: Some(completed),
upload,
})
}
#[derive(Debug, Deserialize)]
struct DigestQuery {
digest: ImageDigest,
}
async fn upload_finalize(
State(registry): State<Arc<ContainerRegistry>>,
Path((repository, image, upload)): Path<(String, String, Uuid)>,
Query(DigestQuery { digest }): Query<DigestQuery>,
creds: ValidCredentials,
request: axum::extract::Request,
) -> Result<Response<Body>, RegistryError> {
let location = ImageLocation::new(repository, image);
registry
.auth_provider
.image_permissions(&creds, &location)
.await
.require_write()?;
match request.headers().get(CONTENT_LENGTH) {
Some(value) => {
let num_bytes: u64 = value
.to_str()
.map_err(|err| RegistryError::ContentLengthMalformed(Box::new(err)))?
.parse()
.map_err(|err| RegistryError::ContentLengthMalformed(Box::new(err)))?;
if num_bytes != 0 {
return Err(RegistryError::NotSupported(
"missing content length not implemented",
));
}
}
None => {
}
}
registry
.storage
.finalize_upload(upload, digest.digest)
.await?;
info!(%upload, %digest, "new image uploaded");
Ok(Response::builder()
.status(StatusCode::CREATED)
.header("Docker-Content-Digest", digest.to_string())
.header(LOCATION, mk_upload_location(&location, upload))
.body(Body::empty())?)
}
async fn manifest_put(
State(registry): State<Arc<ContainerRegistry>>,
Path(manifest_reference): Path<ManifestReference>,
creds: ValidCredentials,
image_manifest_json: String,
) -> Result<Response<Body>, RegistryError> {
registry
.auth_provider
.image_permissions(&creds, manifest_reference.location())
.await
.require_write()?;
let digest = registry
.storage
.put_manifest(&manifest_reference, image_manifest_json.as_bytes())
.await?;
info!(%manifest_reference, %digest, "new manifest received");
registry
.hooks
.on_manifest_uploaded(&manifest_reference)
.await;
Ok(Response::builder()
.status(StatusCode::CREATED)
.header(
LOCATION,
mk_manifest_location(
manifest_reference.location(),
manifest_reference.reference(),
),
)
.header(CONTENT_LENGTH, 0)
.header(
"Docker-Content-Digest",
ImageDigest::new(digest).to_string(),
)
.body(Body::empty())
.unwrap())
}
async fn manifest_get(
State(registry): State<Arc<ContainerRegistry>>,
Path(manifest_reference): Path<ManifestReference>,
creds: ValidCredentials,
) -> Result<Response<Body>, RegistryError> {
registry
.auth_provider
.image_permissions(&creds, manifest_reference.location())
.await
.require_read()?;
let manifest_json = registry
.storage
.get_manifest(&manifest_reference)
.await?
.ok_or(RegistryError::NotFound)?;
let manifest: ImageManifest =
serde_json::from_slice(&manifest_json).map_err(RegistryError::ParseManifest)?;
Ok(Response::builder()
.status(StatusCode::OK)
.header(CONTENT_LENGTH, manifest_json.len())
.header(CONTENT_TYPE, manifest.media_type())
.body(manifest_json.into())
.unwrap())
}