container_registry/
lib.rs

1#![doc = include_str!("../README.md")]
2
3//! ## Use a library
4//!
5//! To use this crate as a library, use the [`ContainerRegistry`] type. Here is a minimal example,
6//! supplying a unit value (`()`) to indicate it does not use any hooks, and `true` as the auth
7//! provider, which will accept any username and password combination as valid:
8//!
9//! ```
10//!# use std::sync::Arc;
11//!# use axum::{extract::DefaultBodyLimit, Router};
12//! use container_registry::auth;
13//! use sec::Secret;
14//!
15//! // The registry requires an existing (empty) directory, which it will initialize.
16//! let storage = tempdir::TempDir::new("container_registry_test")
17//!     .expect("could not create storage dir");
18//!
19//! // Setup an auth scheme that allows uploading with a master password, read-only
20//! // access otherwise.
21//! let auth = Arc::new(auth::Anonymous::new(
22//!     auth::Permissions::ReadOnly,
23//!     Secret::new("master password".to_owned())
24//! ));
25//!
26//! // Instantiate the registry.
27//! let registry = container_registry::ContainerRegistry::builder()
28//!     .storage(storage.path())  // Note: When testing, use `build_for_testing` instead.
29//!     .auth_provider(auth)
30//!     .build()
31//!     .expect("failed to instantiate registry");
32//!
33//! // Create an axum app router and mount our new registry on it.
34//! let app = Router::new()
35//!     .merge(registry.make_router())
36//!     // 1 GB body limit.
37//!     .layer(DefaultBodyLimit::max(1024 * 1024 * 1024));
38//! ```
39//!
40//! Afterwards, `app` can be launched via [`axum::serve()`], see its documentation for details.
41
42pub mod auth;
43pub mod hooks;
44pub mod storage;
45#[cfg(any(feature = "test-support", test))]
46pub mod test_support;
47#[cfg(test)]
48mod tests;
49mod types;
50mod www_authenticate;
51
52use std::{
53    fmt::{self, Display},
54    io,
55    path::PathBuf,
56    str::FromStr,
57    sync::Arc,
58};
59
60use self::{
61    auth::ValidCredentials,
62    storage::{FilesystemStorage, ImageLocation, RegistryStorage},
63    types::{ImageManifest, OciError, OciErrors},
64};
65use auth::{MissingPermission, Permissions};
66use axum::{
67    body::Body,
68    extract::{Path, Query, State},
69    http::{
70        header::{CONTENT_LENGTH, CONTENT_TYPE, LOCATION, RANGE},
71        StatusCode,
72    },
73    response::{IntoResponse, Response},
74    routing::{get, head, patch, post, put},
75    Router,
76};
77use futures::stream::StreamExt;
78use hex::FromHex;
79use serde::{Deserialize, Deserializer, Serialize};
80use storage::Reference;
81use thiserror::Error;
82use tokio::io::AsyncWriteExt;
83use tokio_util::io::ReaderStream;
84use tracing::info;
85use uuid::Uuid;
86
87pub(crate) use {
88    auth::{AuthProvider, Unverified},
89    hooks::RegistryHooks,
90    storage::{FilesystemStorageError, ManifestReference},
91};
92
93/// A container registry error.
94///
95/// Errors produced by the registry have a "safe" [`IntoResponse`] implementation, thus can be
96/// returned straight to the user without security concerns.
97#[derive(Debug, Error)]
98pub enum RegistryError {
99    /// A requested item (eg. manifest, blob, etc.) was not found.
100    #[error("missing item")]
101    NotFound,
102    /// Access to a resource was denied.
103    #[error("permission denied")]
104    PermissionDenied(#[from] MissingPermission),
105    /// Error in storage backend.
106    #[error(transparent)]
107    // TODO: Remove `from` impl.
108    Storage(#[from] storage::Error),
109    /// Error parsing image manifest.
110    #[error("could not parse manifest")]
111    ParseManifest(serde_json::Error),
112    /// A requested/required feature was not supported by this registry.
113    #[error("feature not supported: {0}")]
114    NotSupported(&'static str),
115    /// Invalid integer supplied for content length.
116    #[error("error parsing content length")]
117    ContentLengthMalformed(#[source] Box<dyn std::error::Error + Send + Sync>),
118    /// Incoming stream read error.
119    #[error("failed to read incoming data stream")]
120    IncomingReadFailed(#[source] axum::Error),
121    /// Failed to write local data to storage.
122    #[error("local write failed")]
123    LocalWriteFailed(#[source] io::Error),
124    /// Error building HTTP response.
125    #[error("axum http error")]
126    // Note: These should never occur.
127    AxumHttp(#[from] axum::http::Error),
128}
129
130impl IntoResponse for RegistryError {
131    #[inline(always)]
132    fn into_response(self) -> Response {
133        match self {
134            // TODO: Need better OciError handling here. Not everything is blob unknown.
135            RegistryError::NotFound => (
136                StatusCode::NOT_FOUND,
137                OciErrors::single(OciError::new(types::ErrorCode::BlobUnknown)),
138            )
139                .into_response(),
140            RegistryError::PermissionDenied(_) => (
141                StatusCode::FORBIDDEN,
142                // TODO: Should this be a proper OCI error?
143                "access to request resource was denied",
144            )
145                .into_response(),
146            RegistryError::Storage(err) => err.into_response(),
147            RegistryError::ParseManifest(err) => (
148                StatusCode::BAD_REQUEST,
149                format!("could not parse manifest: {}", err),
150            )
151                .into_response(),
152            RegistryError::NotSupported(feature) => (
153                StatusCode::INTERNAL_SERVER_ERROR,
154                format!("feature not supported: {}", feature),
155            )
156                .into_response(),
157            RegistryError::ContentLengthMalformed(err) => (
158                StatusCode::BAD_REQUEST,
159                format!("invalid content length value: {}", err),
160            )
161                .into_response(),
162            RegistryError::IncomingReadFailed(_err) => (
163                StatusCode::INTERNAL_SERVER_ERROR,
164                "could not read input stream",
165            )
166                .into_response(),
167            RegistryError::LocalWriteFailed(_err) => (
168                StatusCode::INTERNAL_SERVER_ERROR,
169                "could not write image locally",
170            )
171                .into_response(),
172            RegistryError::AxumHttp(_err) => (
173                StatusCode::INTERNAL_SERVER_ERROR,
174                // Fixed message, we don't want to leak anything. This should never happen anyway.
175                "error building axum HTTP response",
176            )
177                .into_response(),
178        }
179    }
180}
181
182/// A container registry storing OCI containers.
183pub struct ContainerRegistry {
184    /// The realm name for the registry.
185    ///
186    /// Solely used for HTTP auth.
187    realm: String,
188    /// An implementation for authentication.
189    auth_provider: Arc<dyn AuthProvider>,
190    /// A storage backend for the registry.
191    storage: Box<dyn RegistryStorage>,
192    /// A hook consumer for the registry.
193    hooks: Box<dyn RegistryHooks>,
194}
195
196impl ContainerRegistry {
197    /// Creates a new builder for a [`ContainerRegistry`].
198    ///
199    /// See documentation of [`ContainerRegistryBuilder`] for details.
200    pub fn builder() -> ContainerRegistryBuilder {
201        ContainerRegistryBuilder::default()
202    }
203
204    /// Builds an [`axum::routing::Router`] for this registry.
205    ///
206    /// Produces the core entry point for the registry; create and mount the router into an `axum`
207    /// application to use it.
208    pub fn make_router(self: Arc<ContainerRegistry>) -> Router {
209        Router::new()
210            .route("/v2/", get(index_v2))
211            .route("/v2/:repository/:image/blobs/:digest", head(blob_check))
212            .route("/v2/:repository/:image/blobs/:digest", get(blob_get))
213            .route("/v2/:repository/:image/blobs/uploads/", post(upload_new))
214            .route(
215                "/v2/:repository/:image/uploads/:upload",
216                patch(upload_add_chunk),
217            )
218            .route(
219                "/v2/:repository/:image/uploads/:upload",
220                put(upload_finalize),
221            )
222            .route(
223                "/v2/:repository/:image/manifests/:reference",
224                put(manifest_put),
225            )
226            .route(
227                "/v2/:repository/:image/manifests/:reference",
228                get(manifest_get),
229            )
230            .with_state(self)
231    }
232}
233
234/// Builder for a new instance of the container registry.
235///
236/// Requires a storage to be set, either by calling [`Self::storage`] or constructing using
237/// [`Self::build_for_testing()`], which requires the `test-support` feature and will use
238/// a temporary directory.
239///
240/// By default, no hooks are set up and the auth provider requires authentication, but does not
241/// grant access to anything.
242#[derive(Default)]
243pub struct ContainerRegistryBuilder {
244    /// Storage to use.
245    storage: Option<PathBuf>,
246    /// Hooks to use.
247    hooks: Option<Box<dyn RegistryHooks>>,
248    /// Auth provider to use.
249    auth_provider: Option<Arc<dyn AuthProvider>>,
250}
251
252impl ContainerRegistryBuilder {
253    /// Sets the auth provider for the new registry.
254    pub fn auth_provider(mut self, auth_provider: Arc<dyn AuthProvider>) -> Self {
255        self.auth_provider = Some(auth_provider);
256        self
257    }
258
259    /// Sets hooks for the new registry to call.
260    pub fn hooks(mut self, hooks: Box<dyn RegistryHooks>) -> Self {
261        self.hooks = Some(hooks);
262        self
263    }
264
265    /// Set the storage path for the new registry.
266    pub fn storage<P>(mut self, storage: P) -> Self
267    where
268        P: Into<PathBuf>,
269    {
270        self.storage = Some(storage.into());
271        self
272    }
273
274    /// Constructs a new registry.
275    ///
276    /// # Panics
277    ///
278    /// Will panic if not storage has been set through [`Self::storage`].
279    pub fn build(mut self) -> Result<Arc<ContainerRegistry>, FilesystemStorageError> {
280        let storage_path = self
281            .storage
282            .expect("attempted to construct registry with no storage path");
283        let storage = Box::new(FilesystemStorage::new(storage_path)?);
284        let auth_provider = self
285            .auth_provider
286            .take()
287            .unwrap_or_else(|| Arc::new(Permissions::NoAccess));
288        let hooks = self.hooks.take().unwrap_or_else(|| Box::new(()));
289        Ok(Arc::new(ContainerRegistry {
290            realm: "ContainerRegistry".to_string(),
291            auth_provider,
292            storage,
293            hooks,
294        }))
295    }
296}
297
298/// Registry index
299///
300/// Returns an empty HTTP OK response if provided credentials are okay, otherwise returns
301/// UNAUTHORIZED.
302async fn index_v2(
303    State(registry): State<Arc<ContainerRegistry>>,
304    unverified: Unverified,
305) -> Response<Body> {
306    let realm = &registry.realm;
307
308    // Both anonymous and named users should be verified to be able to get index. Restricted access
309    // is handled identically for both via the rules set within the registry constructor.
310    if registry
311        .auth_provider
312        .check_credentials(&unverified)
313        .await
314        .is_some()
315    {
316        return Response::builder()
317            .status(StatusCode::OK)
318            .header("WWW-Authenticate", format!("Basic realm=\"{realm}\""))
319            .body(Body::empty())
320            .unwrap();
321    }
322
323    // Return `UNAUTHORIZED`, since we want the client to supply credentials.
324    Response::builder()
325        .status(StatusCode::UNAUTHORIZED)
326        .header("WWW-Authenticate", format!("Basic realm=\"{realm}\""))
327        .body(Body::empty())
328        .unwrap()
329}
330
331/// Returns metadata of a specific image blob.
332async fn blob_check(
333    State(registry): State<Arc<ContainerRegistry>>,
334    Path((_, _, image)): Path<(String, String, ImageDigest)>,
335    creds: ValidCredentials,
336) -> Result<Response, RegistryError> {
337    registry
338        .auth_provider
339        .blob_permissions(&creds, &image)
340        .await
341        .require_read()?;
342
343    if let Some(metadata) = registry.storage.get_blob_metadata(image.digest).await? {
344        Ok(Response::builder()
345            .status(StatusCode::OK)
346            .header(CONTENT_LENGTH, metadata.size())
347            .header("Docker-Content-Digest", image.to_string())
348            .header(CONTENT_TYPE, "application/octet-stream")
349            .body(Body::empty())
350            .unwrap())
351    } else {
352        Ok(Response::builder()
353            .status(StatusCode::NOT_FOUND)
354            .body(Body::empty())
355            .unwrap())
356    }
357}
358
359/// Returns a specific image blob.
360async fn blob_get(
361    State(registry): State<Arc<ContainerRegistry>>,
362    Path((_, _, image)): Path<(String, String, ImageDigest)>,
363    creds: ValidCredentials,
364) -> Result<Response, RegistryError> {
365    registry
366        .auth_provider
367        .blob_permissions(&creds, &image)
368        .await
369        .require_read()?;
370
371    // TODO: Get size for `Content-length` header.
372
373    let reader = registry
374        .storage
375        .get_blob_reader(image.digest)
376        .await?
377        .ok_or(RegistryError::NotFound)?;
378
379    let stream = ReaderStream::new(reader);
380    let body = Body::from_stream(stream);
381
382    Ok(Response::builder()
383        .status(StatusCode::OK)
384        .body(body)
385        .expect("Building a streaming response with body works. qed"))
386}
387
388/// Initiates a new blob upload.
389async fn upload_new(
390    State(registry): State<Arc<ContainerRegistry>>,
391    Path(location): Path<ImageLocation>,
392    creds: ValidCredentials,
393) -> Result<UploadState, RegistryError> {
394    registry
395        .auth_provider
396        .image_permissions(&creds, &location)
397        .await
398        .require_write()?;
399
400    // Initiate a new upload
401    let upload = registry.storage.begin_new_upload().await?;
402
403    Ok(UploadState {
404        location,
405        completed: None,
406        upload,
407    })
408}
409
410/// Returns the URI for a specific part of an upload.
411fn mk_upload_location(location: &ImageLocation, uuid: Uuid) -> String {
412    let repository = &location.repository();
413    let image = &location.image();
414    format!("/v2/{repository}/{image}/uploads/{uuid}")
415}
416
417/// Returns the URI for a specific part of an upload.
418fn mk_manifest_location(location: &ImageLocation, reference: &Reference) -> String {
419    let repository = &location.repository();
420    let image = &location.image();
421    format!("/v2/{repository}/{image}/manifests/{reference}")
422}
423
424/// Image upload state.
425///
426/// Represents the state of a partial upload of a specific blob, which may be uploaded in chunks.
427///
428/// The OCI protocol requires the upload state communicated back through HTTP headers, this type
429/// represents said information.
430#[derive(Debug)]
431struct UploadState {
432    /// The location of the image.
433    location: ImageLocation,
434    /// The amount of bytes completed.
435    completed: Option<u64>,
436    /// The UUID for this specific upload part.
437    upload: Uuid,
438}
439
440impl IntoResponse for UploadState {
441    fn into_response(self) -> Response {
442        let mut builder = Response::builder()
443            .header(LOCATION, mk_upload_location(&self.location, self.upload))
444            .header(CONTENT_LENGTH, 0)
445            .header("Docker-Upload-UUID", self.upload.to_string());
446
447        if let Some(completed) = self.completed {
448            builder = builder
449                .header(RANGE, format!("0-{}", completed))
450                .status(StatusCode::ACCEPTED)
451        } else {
452            builder = builder
453                .header(CONTENT_LENGTH, 0)
454                .status(StatusCode::ACCEPTED);
455            // The spec says to use `CREATED`, but only `ACCEPTED` works?
456        }
457
458        builder.body(Body::empty()).unwrap()
459    }
460}
461
462/// An upload ID.
463#[derive(Copy, Clone, Debug, Deserialize)]
464struct UploadId {
465    /// The UUID representing this upload.
466    upload: Uuid,
467}
468
469#[derive(Debug)]
470
471/// An image hash.
472///
473/// Currently only SHA256 hashes are supported.
474pub struct ImageDigest {
475    /// The actual image digest.
476    digest: storage::Digest,
477}
478
479impl Serialize for ImageDigest {
480    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
481    where
482        S: serde::Serializer,
483    {
484        let full = format!("sha256:{}", self.digest);
485        full.serialize(serializer)
486    }
487}
488
489impl<'de> Deserialize<'de> for ImageDigest {
490    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
491    where
492        D: Deserializer<'de>,
493    {
494        // Note: For some reason, `&str` here causes parsing inside query parameters to fail.
495        let raw = <String>::deserialize(deserializer)?;
496        raw.parse().map_err(serde::de::Error::custom)
497    }
498}
499
500impl ImageDigest {
501    /// Creats a new image hash from an existing digest.
502    #[inline(always)]
503    pub const fn new(digest: storage::Digest) -> Self {
504        Self { digest }
505    }
506
507    /// Returns the actual digest.
508    pub fn digest(&self) -> storage::Digest {
509        self.digest
510    }
511}
512
513/// Error parsing a specific image digest.
514#[derive(Debug, Error)]
515pub enum ImageDigestParseError {
516    /// The given digest was of the wrong length.
517    #[error("wrong length")]
518    WrongLength,
519    /// The given digest had an invalid or unsupported prefix.
520    #[error("wrong prefix")]
521    WrongPrefix,
522    /// The hex encoding was not valid.
523    #[error("hex decoding error")]
524    HexDecodeError,
525}
526
527impl FromStr for ImageDigest {
528    type Err = ImageDigestParseError;
529
530    fn from_str(raw: &str) -> Result<Self, Self::Err> {
531        const SHA256_LEN: usize = 32;
532        const PREFIX_LEN: usize = 7;
533        const DIGEST_HEX_LEN: usize = SHA256_LEN * 2;
534
535        if raw.len() != PREFIX_LEN + DIGEST_HEX_LEN {
536            return Err(ImageDigestParseError::WrongLength);
537        }
538
539        if !raw.starts_with("sha256:") {
540            return Err(ImageDigestParseError::WrongPrefix);
541        }
542
543        let hex_encoded = &raw[PREFIX_LEN..];
544        debug_assert_eq!(hex_encoded.len(), DIGEST_HEX_LEN);
545
546        let digest = <[u8; SHA256_LEN]>::from_hex(hex_encoded)
547            .map_err(|_| ImageDigestParseError::HexDecodeError)?;
548
549        Ok(Self {
550            digest: storage::Digest::new(digest),
551        })
552    }
553}
554
555impl Display for ImageDigest {
556    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
557        write!(f, "sha256:{}", self.digest)
558    }
559}
560
561/// Adds a chunk to an existing upload.
562async fn upload_add_chunk(
563    State(registry): State<Arc<ContainerRegistry>>,
564    Path(location): Path<ImageLocation>,
565    Path(UploadId { upload }): Path<UploadId>,
566    creds: ValidCredentials,
567    request: axum::extract::Request,
568) -> Result<UploadState, RegistryError> {
569    registry
570        .auth_provider
571        .image_permissions(&creds, &location)
572        .await
573        .require_write()?;
574
575    // Check if we have a range - if so, its an unsupported feature, namely monolith uploads.
576    if request.headers().contains_key(RANGE) {
577        return Err(RegistryError::NotSupported(
578            "unsupported feature: chunked uploads",
579        ));
580    }
581
582    let mut writer = registry.storage.get_upload_writer(0, upload).await?;
583
584    // We'll get the entire file in one go, no range header == monolithic uploads.
585    let mut body = request.into_body().into_data_stream();
586
587    let mut completed: u64 = 0;
588    while let Some(result) = body.next().await {
589        let chunk = result.map_err(RegistryError::IncomingReadFailed)?;
590        completed += chunk.len() as u64;
591        writer
592            .write_all(chunk.as_ref())
593            .await
594            .map_err(RegistryError::LocalWriteFailed)?;
595    }
596
597    writer
598        .flush()
599        .await
600        .map_err(RegistryError::LocalWriteFailed)?;
601
602    Ok(UploadState {
603        location,
604        completed: Some(completed),
605        upload,
606    })
607}
608
609/// An image digest on a query string.
610///
611/// Newtype to allow [`axum::extract::Query`] to parse it.
612#[derive(Debug, Deserialize)]
613struct DigestQuery {
614    /// The image in question.
615    digest: ImageDigest,
616}
617
618/// Finishes an upload.
619async fn upload_finalize(
620    State(registry): State<Arc<ContainerRegistry>>,
621    Path((repository, image, upload)): Path<(String, String, Uuid)>,
622    Query(DigestQuery { digest }): Query<DigestQuery>,
623    creds: ValidCredentials,
624    request: axum::extract::Request,
625) -> Result<Response<Body>, RegistryError> {
626    let location = ImageLocation::new(repository, image);
627
628    registry
629        .auth_provider
630        .image_permissions(&creds, &location)
631        .await
632        .require_write()?;
633
634    // We do not support the final chunk in the `PUT` call, so ensure that's not the case.
635    match request.headers().get(CONTENT_LENGTH) {
636        Some(value) => {
637            let num_bytes: u64 = value
638                .to_str()
639                .map_err(|err| RegistryError::ContentLengthMalformed(Box::new(err)))?
640                .parse()
641                .map_err(|err| RegistryError::ContentLengthMalformed(Box::new(err)))?;
642            if num_bytes != 0 {
643                return Err(RegistryError::NotSupported(
644                    "missing content length not implemented",
645                ));
646            }
647
648            // 0 is the only acceptable value here.
649        }
650        None => {
651            // Omitting is fine, indicating no body.
652        }
653    }
654
655    registry
656        .storage
657        .finalize_upload(upload, digest.digest)
658        .await?;
659
660    info!(%upload, %digest, "new image uploaded");
661    Ok(Response::builder()
662        .status(StatusCode::CREATED)
663        .header("Docker-Content-Digest", digest.to_string())
664        .header(LOCATION, mk_upload_location(&location, upload))
665        .body(Body::empty())?)
666}
667
668/// Uploads a manifest.
669async fn manifest_put(
670    State(registry): State<Arc<ContainerRegistry>>,
671    Path(manifest_reference): Path<ManifestReference>,
672    creds: ValidCredentials,
673    image_manifest_json: String,
674) -> Result<Response<Body>, RegistryError> {
675    registry
676        .auth_provider
677        .image_permissions(&creds, manifest_reference.location())
678        .await
679        .require_write()?;
680
681    let digest = registry
682        .storage
683        .put_manifest(&manifest_reference, image_manifest_json.as_bytes())
684        .await?;
685
686    info!(%manifest_reference, %digest, "new manifest received");
687    // Completed upload, call hook:
688    registry
689        .hooks
690        .on_manifest_uploaded(&manifest_reference)
691        .await;
692
693    Ok(Response::builder()
694        .status(StatusCode::CREATED)
695        .header(
696            LOCATION,
697            mk_manifest_location(
698                manifest_reference.location(),
699                manifest_reference.reference(),
700            ),
701        )
702        .header(CONTENT_LENGTH, 0)
703        .header(
704            "Docker-Content-Digest",
705            ImageDigest::new(digest).to_string(),
706        )
707        .body(Body::empty())
708        .unwrap())
709}
710
711/// Retrieves a manifest.
712async fn manifest_get(
713    State(registry): State<Arc<ContainerRegistry>>,
714    Path(manifest_reference): Path<ManifestReference>,
715    creds: ValidCredentials,
716) -> Result<Response<Body>, RegistryError> {
717    registry
718        .auth_provider
719        .image_permissions(&creds, manifest_reference.location())
720        .await
721        .require_read()?;
722
723    let manifest_json = registry
724        .storage
725        .get_manifest(&manifest_reference)
726        .await?
727        .ok_or(RegistryError::NotFound)?;
728
729    let manifest: ImageManifest =
730        serde_json::from_slice(&manifest_json).map_err(RegistryError::ParseManifest)?;
731
732    Ok(Response::builder()
733        .status(StatusCode::OK)
734        .header(CONTENT_LENGTH, manifest_json.len())
735        .header(CONTENT_TYPE, manifest.media_type())
736        .body(manifest_json.into())
737        .unwrap())
738}