container_registry/
storage.rs

1//! Storage backends.
2//!
3//! The `container_registry` crate has somewhat modular storage backends, but currently nothing but
4//! filesystem storage is supported. Contact the author if you'd like to see this change.
5// Note: This module is in worse shape, documentation wise, than the rest. Cleaning this up is the
6//       first step towards supporting custom implementations.
7use std::{
8    fmt::{self, Display},
9    fs,
10    io::{self, Read},
11    path::{Path, PathBuf},
12    str::FromStr,
13};
14
15use axum::{async_trait, http::StatusCode, response::IntoResponse};
16use serde::{Deserialize, Serialize};
17use sha2::Digest as Sha2Digest;
18use thiserror::Error;
19use tokio::io::{AsyncRead, AsyncSeekExt, AsyncWrite};
20use uuid::Uuid;
21
22use super::{types::ImageManifest, ImageDigest};
23
24/// Length of a SHA256 hash in bytes.
25pub const SHA256_LEN: usize = 32;
26
27const BUFFER_SIZE: usize = 1024 * 1024; // 1 MiB
28
29/// An SHA256 digest.
30///
31/// The `container_registry` crate supports only `sha256` digests at this time.
32#[derive(Copy, Clone, Debug, Eq, PartialEq, PartialOrd, Ord, Hash, Serialize)]
33pub struct Digest([u8; SHA256_LEN]);
34
35impl Digest {
36    /// Creates a digest from an existing hash.
37    pub const fn new(bytes: [u8; SHA256_LEN]) -> Self {
38        Self(bytes)
39    }
40
41    /// Creates a digest by hashing given contents.
42    pub fn from_contents(contents: &[u8]) -> Self {
43        let mut hasher = sha2::Sha256::new();
44        hasher.update(contents);
45
46        Self::new(hasher.finalize().into())
47    }
48}
49
50impl Display for Digest {
51    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
52        f.write_str(&hex::encode(&self.0[..]))
53    }
54}
55
56#[derive(Debug, Deserialize)]
57struct LayerManifest {
58    #[serde(rename = "camelCase")]
59    #[allow(dead_code)] // TODO
60    blob_sum: String,
61}
62
63/// Location of a given image.
64///
65/// In an open container registry, images are stored in what `container-registry` calls
66/// "repository" and "image" pairs. For example, the container image specified as
67/// `bitnami/nginx:latest` would have a repository of `bitnami`, image of `nginx` and tag (which
68/// is not part of [`ImageLocation`] of `latest`.
69#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)]
70pub struct ImageLocation {
71    /// The repository part of the image location.
72    repository: String,
73    /// The image part of the image location.
74    image: String,
75}
76
77impl Display for ImageLocation {
78    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
79        write!(f, "{}/{}", self.repository, self.image)
80    }
81}
82
83/// Refers to a specific manifest.
84///
85/// Combines an [`ImageLocation`] with a [`Reference`], e.g. `bitnami/nginx:latest`, which has an
86/// [`ImageLocation`] portion of `bitnami/nginx` and a [`Reference::Tag`] `latest`.
87#[derive(Clone, Debug, Deserialize, Serialize)]
88pub struct ManifestReference {
89    #[serde(flatten)]
90    location: ImageLocation,
91    reference: Reference,
92}
93
94impl Display for ManifestReference {
95    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96        write!(f, "{}:{}", self.location, self.reference)
97    }
98}
99
100impl ManifestReference {
101    /// Creates a new manifest reference.
102    pub fn new(location: ImageLocation, reference: Reference) -> Self {
103        Self {
104            location,
105            reference,
106        }
107    }
108
109    /// Returns the location portion of the image location.
110    pub fn location(&self) -> &ImageLocation {
111        &self.location
112    }
113
114    /// Returns the reference portion of the image location.
115    pub fn reference(&self) -> &Reference {
116        &self.reference
117    }
118}
119
120impl ImageLocation {
121    /// Creates a new image location.
122    pub fn new(repository: String, image: String) -> Self {
123        Self { repository, image }
124    }
125
126    /// Returns the repository portion of the given image location.
127    #[inline(always)]
128    pub fn repository(&self) -> &str {
129        self.repository.as_ref()
130    }
131
132    /// Returns the image portion of the given image location.
133    #[inline(always)]
134    pub fn image(&self) -> &str {
135        self.image.as_ref()
136    }
137}
138
139/// Reference to a specific version of an image.
140#[derive(Clone, Debug)]
141pub enum Reference {
142    /// Image reference by given tag (e.g. `latest`).
143    Tag(String),
144    /// Image referenced by given specific hash.
145    Digest(Digest),
146}
147
148impl<'de> Deserialize<'de> for Reference {
149    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
150    where
151        D: serde::Deserializer<'de>,
152    {
153        let raw = <&str>::deserialize(deserializer)?;
154
155        match ImageDigest::from_str(raw) {
156            Ok(digest) => Ok(Self::Digest(digest.digest)),
157            Err(_) => Ok(Self::Tag(raw.to_owned())),
158        }
159    }
160}
161
162impl Serialize for Reference {
163    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
164    where
165        S: serde::Serializer,
166    {
167        match self {
168            Reference::Tag(tag) => tag.serialize(serializer),
169            Reference::Digest(digest) => ImageDigest::new(*digest).serialize(serializer),
170        }
171    }
172}
173
174impl Reference {
175    /// Creates a new by-tag reference.
176    #[inline(always)]
177    pub fn new_tag<S: ToString>(s: S) -> Self {
178        Reference::Tag(s.to_string())
179    }
180
181    /// Creats a new by-hash reference.
182    #[inline(always)]
183    pub fn new_digest(d: Digest) -> Self {
184        Reference::Digest(d)
185    }
186
187    /// Returns reference as naked tag, if it is a tag.
188    pub fn as_tag(&self) -> Option<&str> {
189        match self {
190            Reference::Tag(tag) => Some(tag),
191            Reference::Digest(_) => None,
192        }
193    }
194}
195
196impl Display for Reference {
197    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
198        match self {
199            Reference::Tag(tag) => Display::fmt(tag, f),
200            Reference::Digest(digest) => Display::fmt(digest, f),
201        }
202    }
203}
204
205/// A storage error.
206#[derive(Debug, Error)]
207pub enum Error {
208    /// Attempted to submit data to an upload that does not exist.
209    #[error("given upload does not exist")]
210    UploadDoesNotExit,
211    /// A content hash mismatched.
212    #[error("digest did not match")]
213    DigestMismatch,
214    /// An IO error.
215    // TODO: Not great to have a catch-all IO error, to be replaced later.
216    #[error("io error")]
217    Io(io::Error),
218    /// A background task panicked.
219    #[error("background task panicked")]
220    BackgroundTaskPanicked(#[source] tokio::task::JoinError),
221    /// Invalid image manifest submitted.
222    #[error("invalid image manifest")]
223    InvalidManifest(#[source] serde_json::Error),
224    /// Attempted to store a manifest under a digest instead of a tag.
225    #[error("cannot store manifest under hash")]
226    NotATag,
227}
228
229impl IntoResponse for Error {
230    #[inline]
231    fn into_response(self) -> axum::response::Response {
232        match self {
233            Error::UploadDoesNotExit => StatusCode::NOT_FOUND.into_response(),
234            Error::InvalidManifest(_) | Error::NotATag => StatusCode::BAD_REQUEST.into_response(),
235            Error::DigestMismatch | Error::Io(_) | Error::BackgroundTaskPanicked(_) => {
236                StatusCode::INTERNAL_SERVER_ERROR.into_response()
237            }
238        }
239    }
240}
241
242#[derive(Debug)]
243pub(crate) struct BlobMetadata {
244    #[allow(dead_code)] // TODO
245    digest: Digest,
246    size: u64,
247}
248
249impl BlobMetadata {
250    #[allow(dead_code)] // TODO
251    pub(crate) fn digest(&self) -> Digest {
252        self.digest
253    }
254
255    pub(crate) fn size(&self) -> u64 {
256        self.size
257    }
258}
259
260#[async_trait]
261pub(crate) trait RegistryStorage: Send + Sync {
262    async fn begin_new_upload(&self) -> Result<Uuid, Error>;
263
264    async fn get_blob_reader(
265        &self,
266        digest: Digest,
267    ) -> Result<Option<Box<dyn AsyncRead + Send + Unpin>>, Error>;
268
269    async fn get_blob_metadata(&self, digest: Digest) -> Result<Option<BlobMetadata>, Error>;
270
271    async fn get_upload_writer(
272        &self,
273        start_at: u64,
274        upload: Uuid,
275    ) -> Result<Box<dyn AsyncWrite + Send + Unpin>, Error>;
276
277    async fn finalize_upload(&self, upload: Uuid, hash: Digest) -> Result<(), Error>;
278
279    async fn get_manifest(
280        &self,
281        manifest_reference: &ManifestReference,
282    ) -> Result<Option<Vec<u8>>, Error>;
283
284    async fn put_manifest(
285        &self,
286        manifest_reference: &ManifestReference,
287        manifest: &[u8],
288    ) -> Result<Digest, Error>;
289}
290
291/// A filesystem backend error.
292#[derive(Debug, Error)]
293pub enum FilesystemStorageError {
294    /// The storage path given could not be canonicalized.
295    #[error("could not canonicalize root {}", path.display())]
296    CouldNotCanonicalizeRoot {
297        path: PathBuf,
298        #[source]
299        err: io::Error,
300    },
301    /// Failed to create directory in storage path.
302    #[error("could not create directory {}", path.display())]
303    FailedToCreateDir {
304        path: PathBuf,
305        #[source]
306        err: io::Error,
307    },
308}
309
310#[derive(Debug)]
311pub(crate) struct FilesystemStorage {
312    uploads: PathBuf,
313    blobs: PathBuf,
314    manifests: PathBuf,
315    tags: PathBuf,
316    rel_manifest_to_blobs: PathBuf,
317}
318
319impl FilesystemStorage {
320    pub(crate) fn new<P: AsRef<Path>>(root: P) -> Result<Self, FilesystemStorageError> {
321        let raw_root = root.as_ref();
322        let root = raw_root.canonicalize().map_err(|err| {
323            FilesystemStorageError::CouldNotCanonicalizeRoot {
324                path: raw_root.to_owned(),
325                err,
326            }
327        })?;
328
329        let uploads = root.join("uploads");
330        let blobs = root.join("blobs");
331        let manifests = root.join("manifests");
332        let tags = root.join("tags");
333        let rel_manifest_to_blobs = PathBuf::from("../../../manifests");
334
335        for dir in [&uploads, &blobs, &manifests, &tags] {
336            if !dir.exists() {
337                fs::create_dir(dir).map_err(|err| FilesystemStorageError::FailedToCreateDir {
338                    path: dir.to_owned(),
339                    err,
340                })?;
341            }
342        }
343
344        Ok(FilesystemStorage {
345            uploads,
346            blobs,
347            manifests,
348            tags,
349            rel_manifest_to_blobs,
350        })
351    }
352    fn blob_path(&self, digest: Digest) -> PathBuf {
353        self.blobs.join(format!("{}", digest))
354    }
355    fn upload_path(&self, upload: Uuid) -> PathBuf {
356        self.uploads.join(format!("{}.partial", upload))
357    }
358
359    fn manifest_path(&self, digest: Digest) -> PathBuf {
360        self.manifests.join(format!("{}", digest))
361    }
362
363    fn blob_rel_path(&self, digest: Digest) -> PathBuf {
364        self.rel_manifest_to_blobs.join(format!("{}", digest))
365    }
366
367    fn tag_path(&self, location: &ImageLocation, tag: &str) -> PathBuf {
368        self.tags
369            .join(location.repository())
370            .join(location.image())
371            .join(tag)
372    }
373
374    fn temp_tag_path(&self) -> PathBuf {
375        self.tags.join(Uuid::new_v4().to_string())
376    }
377}
378
379#[async_trait]
380impl RegistryStorage for FilesystemStorage {
381    async fn begin_new_upload(&self) -> Result<Uuid, Error> {
382        let upload = Uuid::new_v4();
383        let out_path = self.upload_path(upload);
384
385        // Write zero-sized file.
386        let _file = tokio::fs::File::create(out_path).await.map_err(Error::Io)?;
387
388        Ok(upload)
389    }
390
391    async fn get_blob_metadata(&self, digest: Digest) -> Result<Option<BlobMetadata>, Error> {
392        let blob_path = self.blob_path(digest);
393
394        if !blob_path.exists() {
395            return Ok(None);
396        }
397
398        let metadata = tokio::fs::metadata(blob_path).await.map_err(Error::Io)?;
399
400        Ok(Some(BlobMetadata {
401            digest,
402            size: metadata.len(),
403        }))
404    }
405
406    async fn get_blob_reader(
407        &self,
408        digest: Digest,
409    ) -> Result<Option<Box<dyn AsyncRead + Send + Unpin>>, Error> {
410        let blob_path = self.blob_path(digest);
411
412        if !blob_path.exists() {
413            return Ok(None);
414        }
415
416        let reader = tokio::fs::File::open(blob_path).await.map_err(Error::Io)?;
417
418        Ok(Some(Box::new(reader)))
419    }
420
421    async fn get_upload_writer(
422        &self,
423        start_at: u64,
424        upload: Uuid,
425    ) -> Result<Box<dyn AsyncWrite + Send + Unpin>, Error> {
426        let location = self.upload_path(upload);
427
428        if !location.exists() {
429            return Err(Error::UploadDoesNotExit);
430        }
431
432        let mut file = tokio::fs::OpenOptions::new()
433            .append(true)
434            .truncate(false)
435            .open(location)
436            .await
437            .map_err(Error::Io)?;
438
439        file.seek(io::SeekFrom::Start(start_at))
440            .await
441            .map_err(Error::Io)?;
442
443        Ok(Box::new(file))
444    }
445
446    async fn finalize_upload(&self, upload: Uuid, digest: Digest) -> Result<(), Error> {
447        // We are to validate the uploaded partial, then move it into the proper store.
448        // TODO: Lock in place so that the hash cannot be corrupted/attacked.
449
450        let upload_path = self.upload_path(upload);
451
452        if !upload_path.exists() {
453            return Err(Error::UploadDoesNotExit);
454        }
455
456        // We offload hashing to a blocking thread.
457        let actual = {
458            let upload_path = upload_path.clone();
459            tokio::task::spawn_blocking::<_, Result<Digest, Error>>(move || {
460                let mut src = fs::File::open(upload_path).map_err(Error::Io)?;
461
462                // Uses `vec!` instead of `Box`, as initializing the latter blows the stack:
463                let mut buf = vec![0; BUFFER_SIZE];
464                let mut hasher = sha2::Sha256::new();
465
466                loop {
467                    let read = src.read(buf.as_mut()).map_err(Error::Io)?;
468                    if read == 0 {
469                        break;
470                    }
471                    hasher.update(&buf[..read]);
472                }
473
474                let actual = hasher.finalize();
475                Ok(Digest::new(actual.into()))
476            })
477        }
478        .await
479        .map_err(Error::BackgroundTaskPanicked)??;
480
481        if actual != digest {
482            return Err(Error::DigestMismatch);
483        }
484
485        // The uploaded file matches, we can rename it now.
486        let dest = self.blob_path(digest);
487        tokio::fs::rename(upload_path, dest)
488            .await
489            .map_err(Error::Io)?;
490
491        // All good.
492        Ok(())
493    }
494
495    async fn get_manifest(
496        &self,
497        manifest_reference: &ManifestReference,
498    ) -> Result<Option<Vec<u8>>, Error> {
499        let manifest_path = match manifest_reference.reference() {
500            Reference::Tag(ref tag) => self.tag_path(manifest_reference.location(), tag),
501            Reference::Digest(digest) => self.manifest_path(*digest),
502        };
503
504        match tokio::fs::read(manifest_path).await {
505            Ok(data) => Ok(Some(data)),
506            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
507            Err(e) => Err(Error::Io(e)),
508        }
509    }
510
511    async fn put_manifest(
512        &self,
513        manifest_reference: &ManifestReference,
514        manifest: &[u8],
515    ) -> Result<Digest, Error> {
516        // TODO: Validate all blobs are completely uploaded.
517        let _manifest: ImageManifest =
518            serde_json::from_slice(manifest).map_err(Error::InvalidManifest)?;
519
520        let digest = Digest::from_contents(manifest);
521        let dest = self.manifest_path(digest);
522        tokio::fs::write(dest, &manifest).await.map_err(Error::Io)?;
523
524        let tag = self.tag_path(
525            manifest_reference.location(),
526            manifest_reference
527                .reference()
528                .as_tag()
529                .ok_or(Error::NotATag)?,
530        );
531
532        let tag_parent = tag.parent().expect("should have parent");
533
534        if !tag_parent.exists() {
535            tokio::fs::create_dir_all(tag_parent)
536                .await
537                .map_err(Error::Io)?;
538        }
539
540        let tmp_tag = self.temp_tag_path();
541
542        tokio::fs::symlink(self.blob_rel_path(digest), &tmp_tag)
543            .await
544            .map_err(Error::Io)?;
545        tokio::fs::rename(tmp_tag, tag).await.map_err(Error::Io)?;
546
547        Ok(digest)
548    }
549}