1use std::{
8 fmt::{self, Display},
9 fs,
10 io::{self, Read},
11 path::{Path, PathBuf},
12 str::FromStr,
13};
14
15use axum::{async_trait, http::StatusCode, response::IntoResponse};
16use serde::{Deserialize, Serialize};
17use sha2::Digest as Sha2Digest;
18use thiserror::Error;
19use tokio::io::{AsyncRead, AsyncSeekExt, AsyncWrite};
20use uuid::Uuid;
21
22use super::{types::ImageManifest, ImageDigest};
23
24pub const SHA256_LEN: usize = 32;
26
27const BUFFER_SIZE: usize = 1024 * 1024; #[derive(Copy, Clone, Debug, Eq, PartialEq, PartialOrd, Ord, Hash, Serialize)]
33pub struct Digest([u8; SHA256_LEN]);
34
35impl Digest {
36 pub const fn new(bytes: [u8; SHA256_LEN]) -> Self {
38 Self(bytes)
39 }
40
41 pub fn from_contents(contents: &[u8]) -> Self {
43 let mut hasher = sha2::Sha256::new();
44 hasher.update(contents);
45
46 Self::new(hasher.finalize().into())
47 }
48}
49
50impl Display for Digest {
51 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
52 f.write_str(&hex::encode(&self.0[..]))
53 }
54}
55
56#[derive(Debug, Deserialize)]
57struct LayerManifest {
58 #[serde(rename = "camelCase")]
59 #[allow(dead_code)] blob_sum: String,
61}
62
63#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)]
70pub struct ImageLocation {
71 repository: String,
73 image: String,
75}
76
77impl Display for ImageLocation {
78 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
79 write!(f, "{}/{}", self.repository, self.image)
80 }
81}
82
83#[derive(Clone, Debug, Deserialize, Serialize)]
88pub struct ManifestReference {
89 #[serde(flatten)]
90 location: ImageLocation,
91 reference: Reference,
92}
93
94impl Display for ManifestReference {
95 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96 write!(f, "{}:{}", self.location, self.reference)
97 }
98}
99
100impl ManifestReference {
101 pub fn new(location: ImageLocation, reference: Reference) -> Self {
103 Self {
104 location,
105 reference,
106 }
107 }
108
109 pub fn location(&self) -> &ImageLocation {
111 &self.location
112 }
113
114 pub fn reference(&self) -> &Reference {
116 &self.reference
117 }
118}
119
120impl ImageLocation {
121 pub fn new(repository: String, image: String) -> Self {
123 Self { repository, image }
124 }
125
126 #[inline(always)]
128 pub fn repository(&self) -> &str {
129 self.repository.as_ref()
130 }
131
132 #[inline(always)]
134 pub fn image(&self) -> &str {
135 self.image.as_ref()
136 }
137}
138
139#[derive(Clone, Debug)]
141pub enum Reference {
142 Tag(String),
144 Digest(Digest),
146}
147
148impl<'de> Deserialize<'de> for Reference {
149 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
150 where
151 D: serde::Deserializer<'de>,
152 {
153 let raw = <&str>::deserialize(deserializer)?;
154
155 match ImageDigest::from_str(raw) {
156 Ok(digest) => Ok(Self::Digest(digest.digest)),
157 Err(_) => Ok(Self::Tag(raw.to_owned())),
158 }
159 }
160}
161
162impl Serialize for Reference {
163 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
164 where
165 S: serde::Serializer,
166 {
167 match self {
168 Reference::Tag(tag) => tag.serialize(serializer),
169 Reference::Digest(digest) => ImageDigest::new(*digest).serialize(serializer),
170 }
171 }
172}
173
174impl Reference {
175 #[inline(always)]
177 pub fn new_tag<S: ToString>(s: S) -> Self {
178 Reference::Tag(s.to_string())
179 }
180
181 #[inline(always)]
183 pub fn new_digest(d: Digest) -> Self {
184 Reference::Digest(d)
185 }
186
187 pub fn as_tag(&self) -> Option<&str> {
189 match self {
190 Reference::Tag(tag) => Some(tag),
191 Reference::Digest(_) => None,
192 }
193 }
194}
195
196impl Display for Reference {
197 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
198 match self {
199 Reference::Tag(tag) => Display::fmt(tag, f),
200 Reference::Digest(digest) => Display::fmt(digest, f),
201 }
202 }
203}
204
205#[derive(Debug, Error)]
207pub enum Error {
208 #[error("given upload does not exist")]
210 UploadDoesNotExit,
211 #[error("digest did not match")]
213 DigestMismatch,
214 #[error("io error")]
217 Io(io::Error),
218 #[error("background task panicked")]
220 BackgroundTaskPanicked(#[source] tokio::task::JoinError),
221 #[error("invalid image manifest")]
223 InvalidManifest(#[source] serde_json::Error),
224 #[error("cannot store manifest under hash")]
226 NotATag,
227}
228
229impl IntoResponse for Error {
230 #[inline]
231 fn into_response(self) -> axum::response::Response {
232 match self {
233 Error::UploadDoesNotExit => StatusCode::NOT_FOUND.into_response(),
234 Error::InvalidManifest(_) | Error::NotATag => StatusCode::BAD_REQUEST.into_response(),
235 Error::DigestMismatch | Error::Io(_) | Error::BackgroundTaskPanicked(_) => {
236 StatusCode::INTERNAL_SERVER_ERROR.into_response()
237 }
238 }
239 }
240}
241
242#[derive(Debug)]
243pub(crate) struct BlobMetadata {
244 #[allow(dead_code)] digest: Digest,
246 size: u64,
247}
248
249impl BlobMetadata {
250 #[allow(dead_code)] pub(crate) fn digest(&self) -> Digest {
252 self.digest
253 }
254
255 pub(crate) fn size(&self) -> u64 {
256 self.size
257 }
258}
259
260#[async_trait]
261pub(crate) trait RegistryStorage: Send + Sync {
262 async fn begin_new_upload(&self) -> Result<Uuid, Error>;
263
264 async fn get_blob_reader(
265 &self,
266 digest: Digest,
267 ) -> Result<Option<Box<dyn AsyncRead + Send + Unpin>>, Error>;
268
269 async fn get_blob_metadata(&self, digest: Digest) -> Result<Option<BlobMetadata>, Error>;
270
271 async fn get_upload_writer(
272 &self,
273 start_at: u64,
274 upload: Uuid,
275 ) -> Result<Box<dyn AsyncWrite + Send + Unpin>, Error>;
276
277 async fn finalize_upload(&self, upload: Uuid, hash: Digest) -> Result<(), Error>;
278
279 async fn get_manifest(
280 &self,
281 manifest_reference: &ManifestReference,
282 ) -> Result<Option<Vec<u8>>, Error>;
283
284 async fn put_manifest(
285 &self,
286 manifest_reference: &ManifestReference,
287 manifest: &[u8],
288 ) -> Result<Digest, Error>;
289}
290
291#[derive(Debug, Error)]
293pub enum FilesystemStorageError {
294 #[error("could not canonicalize root {}", path.display())]
296 CouldNotCanonicalizeRoot {
297 path: PathBuf,
298 #[source]
299 err: io::Error,
300 },
301 #[error("could not create directory {}", path.display())]
303 FailedToCreateDir {
304 path: PathBuf,
305 #[source]
306 err: io::Error,
307 },
308}
309
310#[derive(Debug)]
311pub(crate) struct FilesystemStorage {
312 uploads: PathBuf,
313 blobs: PathBuf,
314 manifests: PathBuf,
315 tags: PathBuf,
316 rel_manifest_to_blobs: PathBuf,
317}
318
319impl FilesystemStorage {
320 pub(crate) fn new<P: AsRef<Path>>(root: P) -> Result<Self, FilesystemStorageError> {
321 let raw_root = root.as_ref();
322 let root = raw_root.canonicalize().map_err(|err| {
323 FilesystemStorageError::CouldNotCanonicalizeRoot {
324 path: raw_root.to_owned(),
325 err,
326 }
327 })?;
328
329 let uploads = root.join("uploads");
330 let blobs = root.join("blobs");
331 let manifests = root.join("manifests");
332 let tags = root.join("tags");
333 let rel_manifest_to_blobs = PathBuf::from("../../../manifests");
334
335 for dir in [&uploads, &blobs, &manifests, &tags] {
336 if !dir.exists() {
337 fs::create_dir(dir).map_err(|err| FilesystemStorageError::FailedToCreateDir {
338 path: dir.to_owned(),
339 err,
340 })?;
341 }
342 }
343
344 Ok(FilesystemStorage {
345 uploads,
346 blobs,
347 manifests,
348 tags,
349 rel_manifest_to_blobs,
350 })
351 }
352 fn blob_path(&self, digest: Digest) -> PathBuf {
353 self.blobs.join(format!("{}", digest))
354 }
355 fn upload_path(&self, upload: Uuid) -> PathBuf {
356 self.uploads.join(format!("{}.partial", upload))
357 }
358
359 fn manifest_path(&self, digest: Digest) -> PathBuf {
360 self.manifests.join(format!("{}", digest))
361 }
362
363 fn blob_rel_path(&self, digest: Digest) -> PathBuf {
364 self.rel_manifest_to_blobs.join(format!("{}", digest))
365 }
366
367 fn tag_path(&self, location: &ImageLocation, tag: &str) -> PathBuf {
368 self.tags
369 .join(location.repository())
370 .join(location.image())
371 .join(tag)
372 }
373
374 fn temp_tag_path(&self) -> PathBuf {
375 self.tags.join(Uuid::new_v4().to_string())
376 }
377}
378
379#[async_trait]
380impl RegistryStorage for FilesystemStorage {
381 async fn begin_new_upload(&self) -> Result<Uuid, Error> {
382 let upload = Uuid::new_v4();
383 let out_path = self.upload_path(upload);
384
385 let _file = tokio::fs::File::create(out_path).await.map_err(Error::Io)?;
387
388 Ok(upload)
389 }
390
391 async fn get_blob_metadata(&self, digest: Digest) -> Result<Option<BlobMetadata>, Error> {
392 let blob_path = self.blob_path(digest);
393
394 if !blob_path.exists() {
395 return Ok(None);
396 }
397
398 let metadata = tokio::fs::metadata(blob_path).await.map_err(Error::Io)?;
399
400 Ok(Some(BlobMetadata {
401 digest,
402 size: metadata.len(),
403 }))
404 }
405
406 async fn get_blob_reader(
407 &self,
408 digest: Digest,
409 ) -> Result<Option<Box<dyn AsyncRead + Send + Unpin>>, Error> {
410 let blob_path = self.blob_path(digest);
411
412 if !blob_path.exists() {
413 return Ok(None);
414 }
415
416 let reader = tokio::fs::File::open(blob_path).await.map_err(Error::Io)?;
417
418 Ok(Some(Box::new(reader)))
419 }
420
421 async fn get_upload_writer(
422 &self,
423 start_at: u64,
424 upload: Uuid,
425 ) -> Result<Box<dyn AsyncWrite + Send + Unpin>, Error> {
426 let location = self.upload_path(upload);
427
428 if !location.exists() {
429 return Err(Error::UploadDoesNotExit);
430 }
431
432 let mut file = tokio::fs::OpenOptions::new()
433 .append(true)
434 .truncate(false)
435 .open(location)
436 .await
437 .map_err(Error::Io)?;
438
439 file.seek(io::SeekFrom::Start(start_at))
440 .await
441 .map_err(Error::Io)?;
442
443 Ok(Box::new(file))
444 }
445
446 async fn finalize_upload(&self, upload: Uuid, digest: Digest) -> Result<(), Error> {
447 let upload_path = self.upload_path(upload);
451
452 if !upload_path.exists() {
453 return Err(Error::UploadDoesNotExit);
454 }
455
456 let actual = {
458 let upload_path = upload_path.clone();
459 tokio::task::spawn_blocking::<_, Result<Digest, Error>>(move || {
460 let mut src = fs::File::open(upload_path).map_err(Error::Io)?;
461
462 let mut buf = vec![0; BUFFER_SIZE];
464 let mut hasher = sha2::Sha256::new();
465
466 loop {
467 let read = src.read(buf.as_mut()).map_err(Error::Io)?;
468 if read == 0 {
469 break;
470 }
471 hasher.update(&buf[..read]);
472 }
473
474 let actual = hasher.finalize();
475 Ok(Digest::new(actual.into()))
476 })
477 }
478 .await
479 .map_err(Error::BackgroundTaskPanicked)??;
480
481 if actual != digest {
482 return Err(Error::DigestMismatch);
483 }
484
485 let dest = self.blob_path(digest);
487 tokio::fs::rename(upload_path, dest)
488 .await
489 .map_err(Error::Io)?;
490
491 Ok(())
493 }
494
495 async fn get_manifest(
496 &self,
497 manifest_reference: &ManifestReference,
498 ) -> Result<Option<Vec<u8>>, Error> {
499 let manifest_path = match manifest_reference.reference() {
500 Reference::Tag(ref tag) => self.tag_path(manifest_reference.location(), tag),
501 Reference::Digest(digest) => self.manifest_path(*digest),
502 };
503
504 match tokio::fs::read(manifest_path).await {
505 Ok(data) => Ok(Some(data)),
506 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
507 Err(e) => Err(Error::Io(e)),
508 }
509 }
510
511 async fn put_manifest(
512 &self,
513 manifest_reference: &ManifestReference,
514 manifest: &[u8],
515 ) -> Result<Digest, Error> {
516 let _manifest: ImageManifest =
518 serde_json::from_slice(manifest).map_err(Error::InvalidManifest)?;
519
520 let digest = Digest::from_contents(manifest);
521 let dest = self.manifest_path(digest);
522 tokio::fs::write(dest, &manifest).await.map_err(Error::Io)?;
523
524 let tag = self.tag_path(
525 manifest_reference.location(),
526 manifest_reference
527 .reference()
528 .as_tag()
529 .ok_or(Error::NotATag)?,
530 );
531
532 let tag_parent = tag.parent().expect("should have parent");
533
534 if !tag_parent.exists() {
535 tokio::fs::create_dir_all(tag_parent)
536 .await
537 .map_err(Error::Io)?;
538 }
539
540 let tmp_tag = self.temp_tag_path();
541
542 tokio::fs::symlink(self.blob_rel_path(digest), &tmp_tag)
543 .await
544 .map_err(Error::Io)?;
545 tokio::fs::rename(tmp_tag, tag).await.map_err(Error::Io)?;
546
547 Ok(digest)
548 }
549}