Skip to main content

ferro_oci_server/
registry.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Registry metadata plane.
3//!
4//! Spec: OCI Distribution Spec v1.1 §3 "Namespaces" and §4 "Pulling /
5//! Pushing".
6//!
7//! The blob bytes live in [`ferro_blob_store::BlobStore`]; this trait
8//! covers everything else the registry needs to persist — manifests
9//! (keyed by digest), tag -> digest aliases, in-flight upload
10//! sessions, and referrer lookups.
11//!
12//! Phase 1 ships [`InMemoryRegistryMeta`], which uses
13//! `parking_lot::RwLock` to guard a handful of `BTreeMap`s. A
14//! SQLite- / Postgres-backed impl lands in Phase 2.
15
16use std::collections::BTreeMap;
17use std::sync::Arc;
18
19use async_trait::async_trait;
20use bytes::Bytes;
21use ferro_blob_store::{Digest, Result};
22use parking_lot::RwLock;
23use serde::{Deserialize, Serialize};
24
25use crate::reference::Reference;
26use crate::upload::UploadState;
27
28/// Descriptor returned by the referrers API.
29///
30/// Spec §3.3: the response body is an OCI image index whose
31/// `manifests` array contains one of these per referrer.
32#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
33pub struct ReferrerDescriptor {
34    /// Media type of the referring manifest.
35    #[serde(rename = "mediaType")]
36    pub media_type: String,
37    /// Digest of the referring manifest.
38    pub digest: Digest,
39    /// Size in bytes of the referring manifest.
40    pub size: u64,
41    /// Optional artifact type, used as the filter key.
42    #[serde(
43        rename = "artifactType",
44        default,
45        skip_serializing_if = "Option::is_none"
46    )]
47    pub artifact_type: Option<String>,
48    /// Optional annotations.
49    #[serde(default, skip_serializing_if = "Option::is_none")]
50    pub annotations: Option<BTreeMap<String, String>>,
51}
52
53/// Metadata-plane operations required by the OCI handlers.
54#[async_trait]
55pub trait RegistryMeta: Send + Sync {
56    /// Persist a manifest body under (name, reference).
57    ///
58    /// If `reference` is a tag, this also records the tag -> digest
59    /// alias. The canonical key is always the digest, so a digest-keyed
60    /// `get_manifest` must succeed after a tag-keyed put.
61    async fn put_manifest(
62        &self,
63        name: &str,
64        reference: &Reference,
65        digest: &Digest,
66        media_type: &str,
67        body: Bytes,
68    ) -> Result<()>;
69
70    /// Fetch a manifest by (name, reference).
71    ///
72    /// Returns `(canonical-digest, media-type, body)`.
73    async fn get_manifest(
74        &self,
75        name: &str,
76        reference: &Reference,
77    ) -> Result<Option<(Digest, String, Bytes)>>;
78
79    /// Delete a manifest by (name, reference).
80    ///
81    /// Returns `true` if something was deleted. The handler layer is
82    /// responsible for rejecting DELETE-by-tag per spec §4.9.
83    async fn delete_manifest(&self, name: &str, reference: &Reference) -> Result<bool>;
84
85    /// List tags for `name`, honouring `n` / `last` pagination.
86    async fn list_tags(
87        &self,
88        name: &str,
89        last: Option<&str>,
90        n: Option<usize>,
91    ) -> Result<Vec<String>>;
92
93    /// List repositories (catalog endpoint).
94    async fn list_repositories(&self, last: Option<&str>, n: Option<usize>) -> Result<Vec<String>>;
95
96    /// Allocate a new upload UUID for `name`.
97    async fn start_upload(&self, name: &str) -> Result<String>;
98
99    /// Append `chunk` to the upload and return the new byte offset.
100    async fn append_upload(&self, name: &str, uuid: &str, offset: u64, chunk: Bytes)
101    -> Result<u64>;
102
103    /// Mark the upload as complete and return the buffered bytes.
104    async fn complete_upload(&self, name: &str, uuid: &str, digest: &Digest) -> Result<()>;
105
106    /// Retrieve the current upload state (without consuming it).
107    async fn get_upload_state(&self, name: &str, uuid: &str) -> Result<Option<UploadState>>;
108
109    /// Cancel and discard an in-flight upload. Returns `true` when the
110    /// upload existed.
111    async fn cancel_upload(&self, name: &str, uuid: &str) -> Result<bool>;
112
113    /// List referrers for `digest` in `name`, filtered by artifact type.
114    async fn list_referrers(
115        &self,
116        name: &str,
117        digest: &Digest,
118        artifact_type: Option<&str>,
119    ) -> Result<Vec<ReferrerDescriptor>>;
120
121    /// Take the accumulated upload buffer. The handler needs this on
122    /// finalize to hand the bytes to the blob store.
123    async fn take_upload_bytes(&self, name: &str, uuid: &str) -> Result<Option<Bytes>>;
124
125    /// Register a referrer descriptor against a subject digest.
126    ///
127    /// Phase 1 uses this from the manifest PUT handler when the
128    /// manifest has a `subject` field, so the referrers API can surface
129    /// it without a scan.
130    async fn register_referrer(
131        &self,
132        name: &str,
133        subject: &Digest,
134        descriptor: ReferrerDescriptor,
135    ) -> Result<()>;
136}
137
138/// In-memory `RegistryMeta` impl for tests and single-node deployments.
139#[derive(Default)]
140pub struct InMemoryRegistryMeta {
141    inner: RwLock<InMemoryState>,
142}
143
144#[derive(Default)]
145struct InMemoryState {
146    // name -> digest-string -> (media-type, body)
147    manifests: BTreeMap<String, BTreeMap<String, (String, Bytes)>>,
148    // name -> tag -> digest-string
149    tags: BTreeMap<String, BTreeMap<String, String>>,
150    // (name, uuid) -> UploadState
151    uploads: BTreeMap<(String, String), UploadState>,
152    // name -> subject-digest-string -> [referrer]
153    referrers: BTreeMap<String, BTreeMap<String, Vec<ReferrerDescriptor>>>,
154}
155
156impl InMemoryRegistryMeta {
157    /// Construct a fresh empty registry.
158    #[must_use]
159    pub fn new() -> Self {
160        Self::default()
161    }
162
163    /// Wrap in an `Arc<dyn RegistryMeta>` for [`crate::router::AppState`].
164    #[must_use]
165    pub fn shared() -> Arc<dyn RegistryMeta> {
166        Arc::new(Self::new())
167    }
168}
169
170#[async_trait]
171impl RegistryMeta for InMemoryRegistryMeta {
172    async fn put_manifest(
173        &self,
174        name: &str,
175        reference: &Reference,
176        digest: &Digest,
177        media_type: &str,
178        body: Bytes,
179    ) -> Result<()> {
180        let mut guard = self.inner.write();
181        guard
182            .manifests
183            .entry(name.to_owned())
184            .or_default()
185            .insert(digest.to_string(), (media_type.to_owned(), body));
186        if let Reference::Tag(tag) = reference {
187            guard
188                .tags
189                .entry(name.to_owned())
190                .or_default()
191                .insert(tag.clone(), digest.to_string());
192        }
193        Ok(())
194    }
195
196    async fn get_manifest(
197        &self,
198        name: &str,
199        reference: &Reference,
200    ) -> Result<Option<(Digest, String, Bytes)>> {
201        let guard = self.inner.read();
202        let Some(name_map) = guard.manifests.get(name) else {
203            return Ok(None);
204        };
205        let digest_str: String = match reference {
206            Reference::Digest(d) => d.to_string(),
207            Reference::Tag(t) => match guard.tags.get(name).and_then(|m| m.get(t)) {
208                Some(s) => s.clone(),
209                None => return Ok(None),
210            },
211        };
212        let Some((media_type, body)) = name_map.get(&digest_str) else {
213            return Ok(None);
214        };
215        let digest: Digest = digest_str
216            .parse()
217            .map_err(ferro_blob_store::BlobStoreError::InvalidDigest)?;
218        Ok(Some((digest, media_type.clone(), body.clone())))
219    }
220
221    async fn delete_manifest(&self, name: &str, reference: &Reference) -> Result<bool> {
222        let mut guard = self.inner.write();
223        match reference {
224            Reference::Digest(d) => {
225                let digest_str = d.to_string();
226                let Some(name_map) = guard.manifests.get_mut(name) else {
227                    return Ok(false);
228                };
229                let removed = name_map.remove(&digest_str).is_some();
230                if removed && let Some(tag_map) = guard.tags.get_mut(name) {
231                    tag_map.retain(|_, v| v != &digest_str);
232                }
233                Ok(removed)
234            }
235            Reference::Tag(_) => Ok(false),
236        }
237    }
238
239    async fn list_tags(
240        &self,
241        name: &str,
242        last: Option<&str>,
243        n: Option<usize>,
244    ) -> Result<Vec<String>> {
245        let guard = self.inner.read();
246        let Some(tag_map) = guard.tags.get(name) else {
247            return Ok(Vec::new());
248        };
249        let mut names: Vec<String> = tag_map.keys().cloned().collect();
250        names.sort();
251        if let Some(cursor) = last {
252            names.retain(|t| t.as_str() > cursor);
253        }
254        if let Some(limit) = n {
255            names.truncate(limit);
256        }
257        Ok(names)
258    }
259
260    async fn list_repositories(&self, last: Option<&str>, n: Option<usize>) -> Result<Vec<String>> {
261        let guard = self.inner.read();
262        let mut names: Vec<String> = guard.manifests.keys().cloned().collect();
263        names.sort();
264        if let Some(cursor) = last {
265            names.retain(|t| t.as_str() > cursor);
266        }
267        if let Some(limit) = n {
268            names.truncate(limit);
269        }
270        Ok(names)
271    }
272
273    async fn start_upload(&self, name: &str) -> Result<String> {
274        let uuid = uuid::Uuid::new_v4().to_string();
275        let mut guard = self.inner.write();
276        guard.uploads.insert(
277            (name.to_owned(), uuid.clone()),
278            UploadState::new(name, uuid.clone()),
279        );
280        Ok(uuid)
281    }
282
283    async fn append_upload(
284        &self,
285        name: &str,
286        uuid: &str,
287        offset: u64,
288        chunk: Bytes,
289    ) -> Result<u64> {
290        let mut guard = self.inner.write();
291        let key = (name.to_owned(), uuid.to_owned());
292        let Some(state) = guard.uploads.get_mut(&key) else {
293            return Err(ferro_blob_store::BlobStoreError::NotFound(format!(
294                "unknown upload uuid: {uuid}"
295            )));
296        };
297        // Spec §4.3: chunked uploads must be sequential — the next
298        // chunk's `Content-Range` start must equal the current offset.
299        if offset != state.offset() {
300            return Err(ferro_blob_store::BlobStoreError::NotFound(format!(
301                "out-of-order upload chunk: expected offset {}, got {offset}",
302                state.offset()
303            )));
304        }
305        Ok(state.append(&chunk))
306    }
307
308    async fn complete_upload(&self, name: &str, uuid: &str, _digest: &Digest) -> Result<()> {
309        let mut guard = self.inner.write();
310        let key = (name.to_owned(), uuid.to_owned());
311        guard.uploads.remove(&key);
312        Ok(())
313    }
314
315    async fn get_upload_state(&self, name: &str, uuid: &str) -> Result<Option<UploadState>> {
316        let guard = self.inner.read();
317        let key = (name.to_owned(), uuid.to_owned());
318        Ok(guard.uploads.get(&key).cloned())
319    }
320
321    async fn cancel_upload(&self, name: &str, uuid: &str) -> Result<bool> {
322        let mut guard = self.inner.write();
323        let key = (name.to_owned(), uuid.to_owned());
324        Ok(guard.uploads.remove(&key).is_some())
325    }
326
327    async fn list_referrers(
328        &self,
329        name: &str,
330        digest: &Digest,
331        artifact_type: Option<&str>,
332    ) -> Result<Vec<ReferrerDescriptor>> {
333        let guard = self.inner.read();
334        let Some(name_map) = guard.referrers.get(name) else {
335            return Ok(Vec::new());
336        };
337        let Some(list) = name_map.get(&digest.to_string()) else {
338            return Ok(Vec::new());
339        };
340        let out = match artifact_type {
341            Some(at) => list
342                .iter()
343                .filter(|d| d.artifact_type.as_deref() == Some(at))
344                .cloned()
345                .collect(),
346            None => list.clone(),
347        };
348        Ok(out)
349    }
350
351    async fn take_upload_bytes(&self, name: &str, uuid: &str) -> Result<Option<Bytes>> {
352        let mut guard = self.inner.write();
353        let key = (name.to_owned(), uuid.to_owned());
354        Ok(guard.uploads.get_mut(&key).map(UploadState::take_bytes))
355    }
356
357    async fn register_referrer(
358        &self,
359        name: &str,
360        subject: &Digest,
361        descriptor: ReferrerDescriptor,
362    ) -> Result<()> {
363        let mut guard = self.inner.write();
364        guard
365            .referrers
366            .entry(name.to_owned())
367            .or_default()
368            .entry(subject.to_string())
369            .or_default()
370            .push(descriptor);
371        Ok(())
372    }
373}
374
375#[cfg(test)]
376mod tests {
377    use super::{InMemoryRegistryMeta, ReferrerDescriptor, RegistryMeta};
378    use crate::reference::Reference;
379    use crate::upload::UploadState;
380    use bytes::Bytes;
381    use ferro_blob_store::Digest;
382
383    #[tokio::test]
384    async fn start_append_take_cycle() {
385        let reg = InMemoryRegistryMeta::new();
386        let uuid = reg.start_upload("lib/alpine").await.expect("start");
387        let new_off = reg
388            .append_upload("lib/alpine", &uuid, 0, Bytes::from_static(b"hello"))
389            .await
390            .expect("append");
391        assert_eq!(new_off, 5);
392        let state: UploadState = reg
393            .get_upload_state("lib/alpine", &uuid)
394            .await
395            .expect("get")
396            .expect("state present");
397        assert_eq!(state.offset(), 5);
398        let body = reg
399            .take_upload_bytes("lib/alpine", &uuid)
400            .await
401            .expect("take")
402            .expect("bytes present");
403        assert_eq!(&body[..], b"hello");
404    }
405
406    #[tokio::test]
407    async fn out_of_order_chunk_is_rejected() {
408        let reg = InMemoryRegistryMeta::new();
409        let uuid = reg.start_upload("lib/alpine").await.expect("start");
410        reg.append_upload("lib/alpine", &uuid, 0, Bytes::from_static(b"ab"))
411            .await
412            .expect("first chunk");
413        let err = reg
414            .append_upload("lib/alpine", &uuid, 10, Bytes::from_static(b"cd"))
415            .await
416            .expect_err("out-of-order chunk must fail");
417        assert!(matches!(err, ferro_blob_store::BlobStoreError::NotFound(_)));
418    }
419
420    #[tokio::test]
421    async fn manifest_put_and_lookup_by_digest_and_tag() {
422        let reg = InMemoryRegistryMeta::new();
423        let digest = Digest::sha256_of(b"manifest-body");
424        reg.put_manifest(
425            "lib/alpine",
426            &Reference::Tag("latest".to_owned()),
427            &digest,
428            "application/vnd.oci.image.manifest.v1+json",
429            Bytes::from_static(b"manifest-body"),
430        )
431        .await
432        .expect("put manifest");
433        let by_tag = reg
434            .get_manifest("lib/alpine", &Reference::Tag("latest".to_owned()))
435            .await
436            .expect("get by tag")
437            .expect("present");
438        assert_eq!(by_tag.0, digest);
439        let by_digest = reg
440            .get_manifest("lib/alpine", &Reference::Digest(digest.clone()))
441            .await
442            .expect("get by digest")
443            .expect("present");
444        assert_eq!(by_digest.0, digest);
445    }
446
447    #[tokio::test]
448    async fn delete_by_tag_returns_false() {
449        let reg = InMemoryRegistryMeta::new();
450        let digest = Digest::sha256_of(b"manifest-body");
451        reg.put_manifest(
452            "lib/alpine",
453            &Reference::Tag("latest".to_owned()),
454            &digest,
455            "application/vnd.oci.image.manifest.v1+json",
456            Bytes::from_static(b"manifest-body"),
457        )
458        .await
459        .expect("put manifest");
460        let removed = reg
461            .delete_manifest("lib/alpine", &Reference::Tag("latest".to_owned()))
462            .await
463            .expect("delete by tag");
464        assert!(!removed);
465    }
466
467    #[tokio::test]
468    async fn referrers_filter_by_artifact_type() {
469        let reg = InMemoryRegistryMeta::new();
470        let subject = Digest::sha256_of(b"subject");
471        let d1 = Digest::sha256_of(b"sbom");
472        let d2 = Digest::sha256_of(b"sig");
473        reg.register_referrer(
474            "lib/alpine",
475            &subject,
476            ReferrerDescriptor {
477                media_type: "application/vnd.oci.image.manifest.v1+json".to_owned(),
478                digest: d1,
479                size: 10,
480                artifact_type: Some("application/spdx+json".to_owned()),
481                annotations: None,
482            },
483        )
484        .await
485        .expect("register sbom referrer");
486        reg.register_referrer(
487            "lib/alpine",
488            &subject,
489            ReferrerDescriptor {
490                media_type: "application/vnd.oci.image.manifest.v1+json".to_owned(),
491                digest: d2,
492                size: 10,
493                artifact_type: Some("application/vnd.dev.cosign.sig".to_owned()),
494                annotations: None,
495            },
496        )
497        .await
498        .expect("register sig referrer");
499        let all = reg
500            .list_referrers("lib/alpine", &subject, None)
501            .await
502            .expect("list all");
503        assert_eq!(all.len(), 2);
504        let sboms = reg
505            .list_referrers("lib/alpine", &subject, Some("application/spdx+json"))
506            .await
507            .expect("list sboms");
508        assert_eq!(sboms.len(), 1);
509    }
510}