1#![doc = include_str!("../README.md")]
2
3pub mod auth;
43pub mod hooks;
44pub mod storage;
45#[cfg(any(feature = "test-support", test))]
46pub mod test_support;
47#[cfg(test)]
48mod tests;
49mod types;
50mod www_authenticate;
51
52use std::{
53 fmt::{self, Display},
54 io,
55 path::PathBuf,
56 str::FromStr,
57 sync::Arc,
58};
59
60use self::{
61 auth::ValidCredentials,
62 storage::{FilesystemStorage, ImageLocation, RegistryStorage},
63 types::{ImageManifest, OciError, OciErrors},
64};
65use auth::{MissingPermission, Permissions};
66use axum::{
67 body::Body,
68 extract::{Path, Query, State},
69 http::{
70 header::{CONTENT_LENGTH, CONTENT_TYPE, LOCATION, RANGE},
71 StatusCode,
72 },
73 response::{IntoResponse, Response},
74 routing::{get, head, patch, post, put},
75 Router,
76};
77use futures::stream::StreamExt;
78use hex::FromHex;
79use serde::{Deserialize, Deserializer, Serialize};
80use storage::Reference;
81use thiserror::Error;
82use tokio::io::AsyncWriteExt;
83use tokio_util::io::ReaderStream;
84use tracing::info;
85use uuid::Uuid;
86
87pub(crate) use {
88 auth::{AuthProvider, Unverified},
89 hooks::RegistryHooks,
90 storage::{FilesystemStorageError, ManifestReference},
91};
92
93#[derive(Debug, Error)]
98pub enum RegistryError {
99 #[error("missing item")]
101 NotFound,
102 #[error("permission denied")]
104 PermissionDenied(#[from] MissingPermission),
105 #[error(transparent)]
107 Storage(#[from] storage::Error),
109 #[error("could not parse manifest")]
111 ParseManifest(serde_json::Error),
112 #[error("feature not supported: {0}")]
114 NotSupported(&'static str),
115 #[error("error parsing content length")]
117 ContentLengthMalformed(#[source] Box<dyn std::error::Error + Send + Sync>),
118 #[error("failed to read incoming data stream")]
120 IncomingReadFailed(#[source] axum::Error),
121 #[error("local write failed")]
123 LocalWriteFailed(#[source] io::Error),
124 #[error("axum http error")]
126 AxumHttp(#[from] axum::http::Error),
128}
129
130impl IntoResponse for RegistryError {
131 #[inline(always)]
132 fn into_response(self) -> Response {
133 match self {
134 RegistryError::NotFound => (
136 StatusCode::NOT_FOUND,
137 OciErrors::single(OciError::new(types::ErrorCode::BlobUnknown)),
138 )
139 .into_response(),
140 RegistryError::PermissionDenied(_) => (
141 StatusCode::FORBIDDEN,
142 "access to request resource was denied",
144 )
145 .into_response(),
146 RegistryError::Storage(err) => err.into_response(),
147 RegistryError::ParseManifest(err) => (
148 StatusCode::BAD_REQUEST,
149 format!("could not parse manifest: {}", err),
150 )
151 .into_response(),
152 RegistryError::NotSupported(feature) => (
153 StatusCode::INTERNAL_SERVER_ERROR,
154 format!("feature not supported: {}", feature),
155 )
156 .into_response(),
157 RegistryError::ContentLengthMalformed(err) => (
158 StatusCode::BAD_REQUEST,
159 format!("invalid content length value: {}", err),
160 )
161 .into_response(),
162 RegistryError::IncomingReadFailed(_err) => (
163 StatusCode::INTERNAL_SERVER_ERROR,
164 "could not read input stream",
165 )
166 .into_response(),
167 RegistryError::LocalWriteFailed(_err) => (
168 StatusCode::INTERNAL_SERVER_ERROR,
169 "could not write image locally",
170 )
171 .into_response(),
172 RegistryError::AxumHttp(_err) => (
173 StatusCode::INTERNAL_SERVER_ERROR,
174 "error building axum HTTP response",
176 )
177 .into_response(),
178 }
179 }
180}
181
182pub struct ContainerRegistry {
184 realm: String,
188 auth_provider: Arc<dyn AuthProvider>,
190 storage: Box<dyn RegistryStorage>,
192 hooks: Box<dyn RegistryHooks>,
194}
195
196impl ContainerRegistry {
197 pub fn builder() -> ContainerRegistryBuilder {
201 ContainerRegistryBuilder::default()
202 }
203
204 pub fn make_router(self: Arc<ContainerRegistry>) -> Router {
209 Router::new()
210 .route("/v2/", get(index_v2))
211 .route("/v2/:repository/:image/blobs/:digest", head(blob_check))
212 .route("/v2/:repository/:image/blobs/:digest", get(blob_get))
213 .route("/v2/:repository/:image/blobs/uploads/", post(upload_new))
214 .route(
215 "/v2/:repository/:image/uploads/:upload",
216 patch(upload_add_chunk),
217 )
218 .route(
219 "/v2/:repository/:image/uploads/:upload",
220 put(upload_finalize),
221 )
222 .route(
223 "/v2/:repository/:image/manifests/:reference",
224 put(manifest_put),
225 )
226 .route(
227 "/v2/:repository/:image/manifests/:reference",
228 get(manifest_get),
229 )
230 .with_state(self)
231 }
232}
233
234#[derive(Default)]
243pub struct ContainerRegistryBuilder {
244 storage: Option<PathBuf>,
246 hooks: Option<Box<dyn RegistryHooks>>,
248 auth_provider: Option<Arc<dyn AuthProvider>>,
250}
251
252impl ContainerRegistryBuilder {
253 pub fn auth_provider(mut self, auth_provider: Arc<dyn AuthProvider>) -> Self {
255 self.auth_provider = Some(auth_provider);
256 self
257 }
258
259 pub fn hooks(mut self, hooks: Box<dyn RegistryHooks>) -> Self {
261 self.hooks = Some(hooks);
262 self
263 }
264
265 pub fn storage<P>(mut self, storage: P) -> Self
267 where
268 P: Into<PathBuf>,
269 {
270 self.storage = Some(storage.into());
271 self
272 }
273
274 pub fn build(mut self) -> Result<Arc<ContainerRegistry>, FilesystemStorageError> {
280 let storage_path = self
281 .storage
282 .expect("attempted to construct registry with no storage path");
283 let storage = Box::new(FilesystemStorage::new(storage_path)?);
284 let auth_provider = self
285 .auth_provider
286 .take()
287 .unwrap_or_else(|| Arc::new(Permissions::NoAccess));
288 let hooks = self.hooks.take().unwrap_or_else(|| Box::new(()));
289 Ok(Arc::new(ContainerRegistry {
290 realm: "ContainerRegistry".to_string(),
291 auth_provider,
292 storage,
293 hooks,
294 }))
295 }
296}
297
298async fn index_v2(
303 State(registry): State<Arc<ContainerRegistry>>,
304 unverified: Unverified,
305) -> Response<Body> {
306 let realm = ®istry.realm;
307
308 if registry
311 .auth_provider
312 .check_credentials(&unverified)
313 .await
314 .is_some()
315 {
316 return Response::builder()
317 .status(StatusCode::OK)
318 .header("WWW-Authenticate", format!("Basic realm=\"{realm}\""))
319 .body(Body::empty())
320 .unwrap();
321 }
322
323 Response::builder()
325 .status(StatusCode::UNAUTHORIZED)
326 .header("WWW-Authenticate", format!("Basic realm=\"{realm}\""))
327 .body(Body::empty())
328 .unwrap()
329}
330
331async fn blob_check(
333 State(registry): State<Arc<ContainerRegistry>>,
334 Path((_, _, image)): Path<(String, String, ImageDigest)>,
335 creds: ValidCredentials,
336) -> Result<Response, RegistryError> {
337 registry
338 .auth_provider
339 .blob_permissions(&creds, &image)
340 .await
341 .require_read()?;
342
343 if let Some(metadata) = registry.storage.get_blob_metadata(image.digest).await? {
344 Ok(Response::builder()
345 .status(StatusCode::OK)
346 .header(CONTENT_LENGTH, metadata.size())
347 .header("Docker-Content-Digest", image.to_string())
348 .header(CONTENT_TYPE, "application/octet-stream")
349 .body(Body::empty())
350 .unwrap())
351 } else {
352 Ok(Response::builder()
353 .status(StatusCode::NOT_FOUND)
354 .body(Body::empty())
355 .unwrap())
356 }
357}
358
359async fn blob_get(
361 State(registry): State<Arc<ContainerRegistry>>,
362 Path((_, _, image)): Path<(String, String, ImageDigest)>,
363 creds: ValidCredentials,
364) -> Result<Response, RegistryError> {
365 registry
366 .auth_provider
367 .blob_permissions(&creds, &image)
368 .await
369 .require_read()?;
370
371 let reader = registry
374 .storage
375 .get_blob_reader(image.digest)
376 .await?
377 .ok_or(RegistryError::NotFound)?;
378
379 let stream = ReaderStream::new(reader);
380 let body = Body::from_stream(stream);
381
382 Ok(Response::builder()
383 .status(StatusCode::OK)
384 .body(body)
385 .expect("Building a streaming response with body works. qed"))
386}
387
388async fn upload_new(
390 State(registry): State<Arc<ContainerRegistry>>,
391 Path(location): Path<ImageLocation>,
392 creds: ValidCredentials,
393) -> Result<UploadState, RegistryError> {
394 registry
395 .auth_provider
396 .image_permissions(&creds, &location)
397 .await
398 .require_write()?;
399
400 let upload = registry.storage.begin_new_upload().await?;
402
403 Ok(UploadState {
404 location,
405 completed: None,
406 upload,
407 })
408}
409
410fn mk_upload_location(location: &ImageLocation, uuid: Uuid) -> String {
412 let repository = &location.repository();
413 let image = &location.image();
414 format!("/v2/{repository}/{image}/uploads/{uuid}")
415}
416
417fn mk_manifest_location(location: &ImageLocation, reference: &Reference) -> String {
419 let repository = &location.repository();
420 let image = &location.image();
421 format!("/v2/{repository}/{image}/manifests/{reference}")
422}
423
424#[derive(Debug)]
431struct UploadState {
432 location: ImageLocation,
434 completed: Option<u64>,
436 upload: Uuid,
438}
439
440impl IntoResponse for UploadState {
441 fn into_response(self) -> Response {
442 let mut builder = Response::builder()
443 .header(LOCATION, mk_upload_location(&self.location, self.upload))
444 .header(CONTENT_LENGTH, 0)
445 .header("Docker-Upload-UUID", self.upload.to_string());
446
447 if let Some(completed) = self.completed {
448 builder = builder
449 .header(RANGE, format!("0-{}", completed))
450 .status(StatusCode::ACCEPTED)
451 } else {
452 builder = builder
453 .header(CONTENT_LENGTH, 0)
454 .status(StatusCode::ACCEPTED);
455 }
457
458 builder.body(Body::empty()).unwrap()
459 }
460}
461
462#[derive(Copy, Clone, Debug, Deserialize)]
464struct UploadId {
465 upload: Uuid,
467}
468
469#[derive(Debug)]
470
471pub struct ImageDigest {
475 digest: storage::Digest,
477}
478
479impl Serialize for ImageDigest {
480 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
481 where
482 S: serde::Serializer,
483 {
484 let full = format!("sha256:{}", self.digest);
485 full.serialize(serializer)
486 }
487}
488
489impl<'de> Deserialize<'de> for ImageDigest {
490 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
491 where
492 D: Deserializer<'de>,
493 {
494 let raw = <String>::deserialize(deserializer)?;
496 raw.parse().map_err(serde::de::Error::custom)
497 }
498}
499
500impl ImageDigest {
501 #[inline(always)]
503 pub const fn new(digest: storage::Digest) -> Self {
504 Self { digest }
505 }
506
507 pub fn digest(&self) -> storage::Digest {
509 self.digest
510 }
511}
512
513#[derive(Debug, Error)]
515pub enum ImageDigestParseError {
516 #[error("wrong length")]
518 WrongLength,
519 #[error("wrong prefix")]
521 WrongPrefix,
522 #[error("hex decoding error")]
524 HexDecodeError,
525}
526
527impl FromStr for ImageDigest {
528 type Err = ImageDigestParseError;
529
530 fn from_str(raw: &str) -> Result<Self, Self::Err> {
531 const SHA256_LEN: usize = 32;
532 const PREFIX_LEN: usize = 7;
533 const DIGEST_HEX_LEN: usize = SHA256_LEN * 2;
534
535 if raw.len() != PREFIX_LEN + DIGEST_HEX_LEN {
536 return Err(ImageDigestParseError::WrongLength);
537 }
538
539 if !raw.starts_with("sha256:") {
540 return Err(ImageDigestParseError::WrongPrefix);
541 }
542
543 let hex_encoded = &raw[PREFIX_LEN..];
544 debug_assert_eq!(hex_encoded.len(), DIGEST_HEX_LEN);
545
546 let digest = <[u8; SHA256_LEN]>::from_hex(hex_encoded)
547 .map_err(|_| ImageDigestParseError::HexDecodeError)?;
548
549 Ok(Self {
550 digest: storage::Digest::new(digest),
551 })
552 }
553}
554
555impl Display for ImageDigest {
556 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
557 write!(f, "sha256:{}", self.digest)
558 }
559}
560
561async fn upload_add_chunk(
563 State(registry): State<Arc<ContainerRegistry>>,
564 Path(location): Path<ImageLocation>,
565 Path(UploadId { upload }): Path<UploadId>,
566 creds: ValidCredentials,
567 request: axum::extract::Request,
568) -> Result<UploadState, RegistryError> {
569 registry
570 .auth_provider
571 .image_permissions(&creds, &location)
572 .await
573 .require_write()?;
574
575 if request.headers().contains_key(RANGE) {
577 return Err(RegistryError::NotSupported(
578 "unsupported feature: chunked uploads",
579 ));
580 }
581
582 let mut writer = registry.storage.get_upload_writer(0, upload).await?;
583
584 let mut body = request.into_body().into_data_stream();
586
587 let mut completed: u64 = 0;
588 while let Some(result) = body.next().await {
589 let chunk = result.map_err(RegistryError::IncomingReadFailed)?;
590 completed += chunk.len() as u64;
591 writer
592 .write_all(chunk.as_ref())
593 .await
594 .map_err(RegistryError::LocalWriteFailed)?;
595 }
596
597 writer
598 .flush()
599 .await
600 .map_err(RegistryError::LocalWriteFailed)?;
601
602 Ok(UploadState {
603 location,
604 completed: Some(completed),
605 upload,
606 })
607}
608
609#[derive(Debug, Deserialize)]
613struct DigestQuery {
614 digest: ImageDigest,
616}
617
618async fn upload_finalize(
620 State(registry): State<Arc<ContainerRegistry>>,
621 Path((repository, image, upload)): Path<(String, String, Uuid)>,
622 Query(DigestQuery { digest }): Query<DigestQuery>,
623 creds: ValidCredentials,
624 request: axum::extract::Request,
625) -> Result<Response<Body>, RegistryError> {
626 let location = ImageLocation::new(repository, image);
627
628 registry
629 .auth_provider
630 .image_permissions(&creds, &location)
631 .await
632 .require_write()?;
633
634 match request.headers().get(CONTENT_LENGTH) {
636 Some(value) => {
637 let num_bytes: u64 = value
638 .to_str()
639 .map_err(|err| RegistryError::ContentLengthMalformed(Box::new(err)))?
640 .parse()
641 .map_err(|err| RegistryError::ContentLengthMalformed(Box::new(err)))?;
642 if num_bytes != 0 {
643 return Err(RegistryError::NotSupported(
644 "missing content length not implemented",
645 ));
646 }
647
648 }
650 None => {
651 }
653 }
654
655 registry
656 .storage
657 .finalize_upload(upload, digest.digest)
658 .await?;
659
660 info!(%upload, %digest, "new image uploaded");
661 Ok(Response::builder()
662 .status(StatusCode::CREATED)
663 .header("Docker-Content-Digest", digest.to_string())
664 .header(LOCATION, mk_upload_location(&location, upload))
665 .body(Body::empty())?)
666}
667
668async fn manifest_put(
670 State(registry): State<Arc<ContainerRegistry>>,
671 Path(manifest_reference): Path<ManifestReference>,
672 creds: ValidCredentials,
673 image_manifest_json: String,
674) -> Result<Response<Body>, RegistryError> {
675 registry
676 .auth_provider
677 .image_permissions(&creds, manifest_reference.location())
678 .await
679 .require_write()?;
680
681 let digest = registry
682 .storage
683 .put_manifest(&manifest_reference, image_manifest_json.as_bytes())
684 .await?;
685
686 info!(%manifest_reference, %digest, "new manifest received");
687 registry
689 .hooks
690 .on_manifest_uploaded(&manifest_reference)
691 .await;
692
693 Ok(Response::builder()
694 .status(StatusCode::CREATED)
695 .header(
696 LOCATION,
697 mk_manifest_location(
698 manifest_reference.location(),
699 manifest_reference.reference(),
700 ),
701 )
702 .header(CONTENT_LENGTH, 0)
703 .header(
704 "Docker-Content-Digest",
705 ImageDigest::new(digest).to_string(),
706 )
707 .body(Body::empty())
708 .unwrap())
709}
710
711async fn manifest_get(
713 State(registry): State<Arc<ContainerRegistry>>,
714 Path(manifest_reference): Path<ManifestReference>,
715 creds: ValidCredentials,
716) -> Result<Response<Body>, RegistryError> {
717 registry
718 .auth_provider
719 .image_permissions(&creds, manifest_reference.location())
720 .await
721 .require_read()?;
722
723 let manifest_json = registry
724 .storage
725 .get_manifest(&manifest_reference)
726 .await?
727 .ok_or(RegistryError::NotFound)?;
728
729 let manifest: ImageManifest =
730 serde_json::from_slice(&manifest_json).map_err(RegistryError::ParseManifest)?;
731
732 Ok(Response::builder()
733 .status(StatusCode::OK)
734 .header(CONTENT_LENGTH, manifest_json.len())
735 .header(CONTENT_TYPE, manifest.media_type())
736 .body(manifest_json.into())
737 .unwrap())
738}