1use std::collections::BTreeMap;
17use std::sync::Arc;
18
19use async_trait::async_trait;
20use bytes::Bytes;
21use ferro_blob_store::{Digest, Result};
22use parking_lot::RwLock;
23use serde::{Deserialize, Serialize};
24
25use crate::reference::Reference;
26use crate::upload::UploadState;
27
28#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
33pub struct ReferrerDescriptor {
34 #[serde(rename = "mediaType")]
36 pub media_type: String,
37 pub digest: Digest,
39 pub size: u64,
41 #[serde(
43 rename = "artifactType",
44 default,
45 skip_serializing_if = "Option::is_none"
46 )]
47 pub artifact_type: Option<String>,
48 #[serde(default, skip_serializing_if = "Option::is_none")]
50 pub annotations: Option<BTreeMap<String, String>>,
51}
52
53#[async_trait]
55pub trait RegistryMeta: Send + Sync {
56 async fn put_manifest(
62 &self,
63 name: &str,
64 reference: &Reference,
65 digest: &Digest,
66 media_type: &str,
67 body: Bytes,
68 ) -> Result<()>;
69
70 async fn get_manifest(
74 &self,
75 name: &str,
76 reference: &Reference,
77 ) -> Result<Option<(Digest, String, Bytes)>>;
78
79 async fn delete_manifest(&self, name: &str, reference: &Reference) -> Result<bool>;
84
85 async fn list_tags(
87 &self,
88 name: &str,
89 last: Option<&str>,
90 n: Option<usize>,
91 ) -> Result<Vec<String>>;
92
93 async fn list_repositories(&self, last: Option<&str>, n: Option<usize>) -> Result<Vec<String>>;
95
96 async fn start_upload(&self, name: &str) -> Result<String>;
98
99 async fn append_upload(&self, name: &str, uuid: &str, offset: u64, chunk: Bytes)
101 -> Result<u64>;
102
103 async fn complete_upload(&self, name: &str, uuid: &str, digest: &Digest) -> Result<()>;
105
106 async fn get_upload_state(&self, name: &str, uuid: &str) -> Result<Option<UploadState>>;
108
109 async fn cancel_upload(&self, name: &str, uuid: &str) -> Result<bool>;
112
113 async fn list_referrers(
115 &self,
116 name: &str,
117 digest: &Digest,
118 artifact_type: Option<&str>,
119 ) -> Result<Vec<ReferrerDescriptor>>;
120
121 async fn take_upload_bytes(&self, name: &str, uuid: &str) -> Result<Option<Bytes>>;
124
125 async fn register_referrer(
131 &self,
132 name: &str,
133 subject: &Digest,
134 descriptor: ReferrerDescriptor,
135 ) -> Result<()>;
136}
137
138#[derive(Default)]
140pub struct InMemoryRegistryMeta {
141 inner: RwLock<InMemoryState>,
142}
143
144#[derive(Default)]
145struct InMemoryState {
146 manifests: BTreeMap<String, BTreeMap<String, (String, Bytes)>>,
148 tags: BTreeMap<String, BTreeMap<String, String>>,
150 uploads: BTreeMap<(String, String), UploadState>,
152 referrers: BTreeMap<String, BTreeMap<String, Vec<ReferrerDescriptor>>>,
154}
155
156impl InMemoryRegistryMeta {
157 #[must_use]
159 pub fn new() -> Self {
160 Self::default()
161 }
162
163 #[must_use]
165 pub fn shared() -> Arc<dyn RegistryMeta> {
166 Arc::new(Self::new())
167 }
168}
169
170#[async_trait]
171impl RegistryMeta for InMemoryRegistryMeta {
172 async fn put_manifest(
173 &self,
174 name: &str,
175 reference: &Reference,
176 digest: &Digest,
177 media_type: &str,
178 body: Bytes,
179 ) -> Result<()> {
180 let mut guard = self.inner.write();
181 guard
182 .manifests
183 .entry(name.to_owned())
184 .or_default()
185 .insert(digest.to_string(), (media_type.to_owned(), body));
186 if let Reference::Tag(tag) = reference {
187 guard
188 .tags
189 .entry(name.to_owned())
190 .or_default()
191 .insert(tag.clone(), digest.to_string());
192 }
193 Ok(())
194 }
195
196 async fn get_manifest(
197 &self,
198 name: &str,
199 reference: &Reference,
200 ) -> Result<Option<(Digest, String, Bytes)>> {
201 let guard = self.inner.read();
202 let Some(name_map) = guard.manifests.get(name) else {
203 return Ok(None);
204 };
205 let digest_str: String = match reference {
206 Reference::Digest(d) => d.to_string(),
207 Reference::Tag(t) => match guard.tags.get(name).and_then(|m| m.get(t)) {
208 Some(s) => s.clone(),
209 None => return Ok(None),
210 },
211 };
212 let Some((media_type, body)) = name_map.get(&digest_str) else {
213 return Ok(None);
214 };
215 let digest: Digest = digest_str
216 .parse()
217 .map_err(ferro_blob_store::BlobStoreError::InvalidDigest)?;
218 Ok(Some((digest, media_type.clone(), body.clone())))
219 }
220
221 async fn delete_manifest(&self, name: &str, reference: &Reference) -> Result<bool> {
222 let mut guard = self.inner.write();
223 match reference {
224 Reference::Digest(d) => {
225 let digest_str = d.to_string();
226 let Some(name_map) = guard.manifests.get_mut(name) else {
227 return Ok(false);
228 };
229 let removed = name_map.remove(&digest_str).is_some();
230 if removed && let Some(tag_map) = guard.tags.get_mut(name) {
231 tag_map.retain(|_, v| v != &digest_str);
232 }
233 Ok(removed)
234 }
235 Reference::Tag(_) => Ok(false),
236 }
237 }
238
239 async fn list_tags(
240 &self,
241 name: &str,
242 last: Option<&str>,
243 n: Option<usize>,
244 ) -> Result<Vec<String>> {
245 let guard = self.inner.read();
246 let Some(tag_map) = guard.tags.get(name) else {
247 return Ok(Vec::new());
248 };
249 let mut names: Vec<String> = tag_map.keys().cloned().collect();
250 names.sort();
251 if let Some(cursor) = last {
252 names.retain(|t| t.as_str() > cursor);
253 }
254 if let Some(limit) = n {
255 names.truncate(limit);
256 }
257 Ok(names)
258 }
259
260 async fn list_repositories(&self, last: Option<&str>, n: Option<usize>) -> Result<Vec<String>> {
261 let guard = self.inner.read();
262 let mut names: Vec<String> = guard.manifests.keys().cloned().collect();
263 names.sort();
264 if let Some(cursor) = last {
265 names.retain(|t| t.as_str() > cursor);
266 }
267 if let Some(limit) = n {
268 names.truncate(limit);
269 }
270 Ok(names)
271 }
272
273 async fn start_upload(&self, name: &str) -> Result<String> {
274 let uuid = uuid::Uuid::new_v4().to_string();
275 let mut guard = self.inner.write();
276 guard.uploads.insert(
277 (name.to_owned(), uuid.clone()),
278 UploadState::new(name, uuid.clone()),
279 );
280 Ok(uuid)
281 }
282
283 async fn append_upload(
284 &self,
285 name: &str,
286 uuid: &str,
287 offset: u64,
288 chunk: Bytes,
289 ) -> Result<u64> {
290 let mut guard = self.inner.write();
291 let key = (name.to_owned(), uuid.to_owned());
292 let Some(state) = guard.uploads.get_mut(&key) else {
293 return Err(ferro_blob_store::BlobStoreError::NotFound(format!(
294 "unknown upload uuid: {uuid}"
295 )));
296 };
297 if offset != state.offset() {
300 return Err(ferro_blob_store::BlobStoreError::NotFound(format!(
301 "out-of-order upload chunk: expected offset {}, got {offset}",
302 state.offset()
303 )));
304 }
305 Ok(state.append(&chunk))
306 }
307
308 async fn complete_upload(&self, name: &str, uuid: &str, _digest: &Digest) -> Result<()> {
309 let mut guard = self.inner.write();
310 let key = (name.to_owned(), uuid.to_owned());
311 guard.uploads.remove(&key);
312 Ok(())
313 }
314
315 async fn get_upload_state(&self, name: &str, uuid: &str) -> Result<Option<UploadState>> {
316 let guard = self.inner.read();
317 let key = (name.to_owned(), uuid.to_owned());
318 Ok(guard.uploads.get(&key).cloned())
319 }
320
321 async fn cancel_upload(&self, name: &str, uuid: &str) -> Result<bool> {
322 let mut guard = self.inner.write();
323 let key = (name.to_owned(), uuid.to_owned());
324 Ok(guard.uploads.remove(&key).is_some())
325 }
326
327 async fn list_referrers(
328 &self,
329 name: &str,
330 digest: &Digest,
331 artifact_type: Option<&str>,
332 ) -> Result<Vec<ReferrerDescriptor>> {
333 let guard = self.inner.read();
334 let Some(name_map) = guard.referrers.get(name) else {
335 return Ok(Vec::new());
336 };
337 let Some(list) = name_map.get(&digest.to_string()) else {
338 return Ok(Vec::new());
339 };
340 let out = match artifact_type {
341 Some(at) => list
342 .iter()
343 .filter(|d| d.artifact_type.as_deref() == Some(at))
344 .cloned()
345 .collect(),
346 None => list.clone(),
347 };
348 Ok(out)
349 }
350
351 async fn take_upload_bytes(&self, name: &str, uuid: &str) -> Result<Option<Bytes>> {
352 let mut guard = self.inner.write();
353 let key = (name.to_owned(), uuid.to_owned());
354 Ok(guard.uploads.get_mut(&key).map(UploadState::take_bytes))
355 }
356
357 async fn register_referrer(
358 &self,
359 name: &str,
360 subject: &Digest,
361 descriptor: ReferrerDescriptor,
362 ) -> Result<()> {
363 let mut guard = self.inner.write();
364 guard
365 .referrers
366 .entry(name.to_owned())
367 .or_default()
368 .entry(subject.to_string())
369 .or_default()
370 .push(descriptor);
371 Ok(())
372 }
373}
374
375#[cfg(test)]
376mod tests {
377 use super::{InMemoryRegistryMeta, ReferrerDescriptor, RegistryMeta};
378 use crate::reference::Reference;
379 use crate::upload::UploadState;
380 use bytes::Bytes;
381 use ferro_blob_store::Digest;
382
383 #[tokio::test]
384 async fn start_append_take_cycle() {
385 let reg = InMemoryRegistryMeta::new();
386 let uuid = reg.start_upload("lib/alpine").await.expect("start");
387 let new_off = reg
388 .append_upload("lib/alpine", &uuid, 0, Bytes::from_static(b"hello"))
389 .await
390 .expect("append");
391 assert_eq!(new_off, 5);
392 let state: UploadState = reg
393 .get_upload_state("lib/alpine", &uuid)
394 .await
395 .expect("get")
396 .expect("state present");
397 assert_eq!(state.offset(), 5);
398 let body = reg
399 .take_upload_bytes("lib/alpine", &uuid)
400 .await
401 .expect("take")
402 .expect("bytes present");
403 assert_eq!(&body[..], b"hello");
404 }
405
406 #[tokio::test]
407 async fn out_of_order_chunk_is_rejected() {
408 let reg = InMemoryRegistryMeta::new();
409 let uuid = reg.start_upload("lib/alpine").await.expect("start");
410 reg.append_upload("lib/alpine", &uuid, 0, Bytes::from_static(b"ab"))
411 .await
412 .expect("first chunk");
413 let err = reg
414 .append_upload("lib/alpine", &uuid, 10, Bytes::from_static(b"cd"))
415 .await
416 .expect_err("out-of-order chunk must fail");
417 assert!(matches!(err, ferro_blob_store::BlobStoreError::NotFound(_)));
418 }
419
420 #[tokio::test]
421 async fn manifest_put_and_lookup_by_digest_and_tag() {
422 let reg = InMemoryRegistryMeta::new();
423 let digest = Digest::sha256_of(b"manifest-body");
424 reg.put_manifest(
425 "lib/alpine",
426 &Reference::Tag("latest".to_owned()),
427 &digest,
428 "application/vnd.oci.image.manifest.v1+json",
429 Bytes::from_static(b"manifest-body"),
430 )
431 .await
432 .expect("put manifest");
433 let by_tag = reg
434 .get_manifest("lib/alpine", &Reference::Tag("latest".to_owned()))
435 .await
436 .expect("get by tag")
437 .expect("present");
438 assert_eq!(by_tag.0, digest);
439 let by_digest = reg
440 .get_manifest("lib/alpine", &Reference::Digest(digest.clone()))
441 .await
442 .expect("get by digest")
443 .expect("present");
444 assert_eq!(by_digest.0, digest);
445 }
446
447 #[tokio::test]
448 async fn delete_by_tag_returns_false() {
449 let reg = InMemoryRegistryMeta::new();
450 let digest = Digest::sha256_of(b"manifest-body");
451 reg.put_manifest(
452 "lib/alpine",
453 &Reference::Tag("latest".to_owned()),
454 &digest,
455 "application/vnd.oci.image.manifest.v1+json",
456 Bytes::from_static(b"manifest-body"),
457 )
458 .await
459 .expect("put manifest");
460 let removed = reg
461 .delete_manifest("lib/alpine", &Reference::Tag("latest".to_owned()))
462 .await
463 .expect("delete by tag");
464 assert!(!removed);
465 }
466
467 #[tokio::test]
468 async fn referrers_filter_by_artifact_type() {
469 let reg = InMemoryRegistryMeta::new();
470 let subject = Digest::sha256_of(b"subject");
471 let d1 = Digest::sha256_of(b"sbom");
472 let d2 = Digest::sha256_of(b"sig");
473 reg.register_referrer(
474 "lib/alpine",
475 &subject,
476 ReferrerDescriptor {
477 media_type: "application/vnd.oci.image.manifest.v1+json".to_owned(),
478 digest: d1,
479 size: 10,
480 artifact_type: Some("application/spdx+json".to_owned()),
481 annotations: None,
482 },
483 )
484 .await
485 .expect("register sbom referrer");
486 reg.register_referrer(
487 "lib/alpine",
488 &subject,
489 ReferrerDescriptor {
490 media_type: "application/vnd.oci.image.manifest.v1+json".to_owned(),
491 digest: d2,
492 size: 10,
493 artifact_type: Some("application/vnd.dev.cosign.sig".to_owned()),
494 annotations: None,
495 },
496 )
497 .await
498 .expect("register sig referrer");
499 let all = reg
500 .list_referrers("lib/alpine", &subject, None)
501 .await
502 .expect("list all");
503 assert_eq!(all.len(), 2);
504 let sboms = reg
505 .list_referrers("lib/alpine", &subject, Some("application/spdx+json"))
506 .await
507 .expect("list sboms");
508 assert_eq!(sboms.len(), 1);
509 }
510}