1use super::pinned_registry as canonical;
4use crate::contract::scope::{ScopeError, ScopeId};
5use async_trait::async_trait;
6use serde::de::DeserializeOwned;
7use serde::{Deserialize, Serialize};
8use serde_json::{Value, json};
9use sha2::{Digest, Sha256};
10use std::marker::PhantomData;
11use std::sync::Arc;
12use thiserror::Error;
13
14#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
16pub struct VersionRef {
17 pub kind: String,
18 pub id: String,
19 pub version: u64,
20}
21
22#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
24pub struct VersionedRecord<T> {
25 pub kind: String,
26 pub id: String,
27 pub version: u64,
28 pub content_hash: String,
29 pub value_schema_version: u32,
30 pub value: T,
31 pub canonical_json_bytes: Vec<u8>,
32 pub created_at_ms: u64,
33 pub metadata: Value,
34}
35
36impl<T> VersionedRecord<T> {
37 pub fn verify_content_hash(&self) -> Result<(), VersionedRegistryError> {
43 let digest = Sha256::digest(&self.canonical_json_bytes);
44 let actual = format!("sha256:{digest:x}");
45 if actual != self.content_hash {
46 return Err(VersionedRegistryError::Backend(format!(
47 "stored content_hash {stored} does not match recomputed digest {actual} \
48 for {kind}/{id}@{version}",
49 stored = self.content_hash,
50 kind = self.kind,
51 id = self.id,
52 version = self.version,
53 )));
54 }
55 Ok(())
56 }
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
61pub enum PublishOutcome<T> {
62 Created(VersionedRecord<T>),
63 Noop(VersionedRecord<T>),
64}
65
66#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
68pub struct RegistryResourcePublish {
69 pub kind: String,
70 pub id: String,
71 pub value: Value,
72 pub value_schema_version: u32,
73 pub metadata: Value,
74}
75
76#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
78pub struct VersionedResourceState {
79 pub scope_id: String,
80 pub kind: String,
81 pub id: String,
82 pub current_version: Option<u64>,
83 #[serde(default, skip_serializing_if = "Option::is_none")]
84 pub archived_at_ms: Option<u64>,
85 pub created_at_ms: u64,
86 pub updated_at_ms: u64,
87 pub metadata: Value,
88}
89
90impl VersionedResourceState {
91 pub fn ensure_not_archived(&self, kind: &str, id: &str) -> Result<(), VersionedRegistryError> {
95 if self.archived_at_ms.is_some() {
96 Err(VersionedRegistryError::Archived {
97 kind: kind.to_string(),
98 id: id.to_string(),
99 })
100 } else {
101 Ok(())
102 }
103 }
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
108pub struct ConfigRevisionRef {
109 pub namespace: String,
110 pub id: String,
111 pub revision: u64,
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
116pub struct RegistryPublication {
117 pub publication_id: String,
118 pub scope_id: String,
119 pub snapshot_version: u64,
120 pub entries: Vec<VersionRef>,
121 #[serde(default)]
122 pub source_config_revisions: Vec<ConfigRevisionRef>,
123 #[serde(default, skip_serializing_if = "Option::is_none")]
124 pub created_by: Option<String>,
125 pub created_at_ms: u64,
126 pub metadata: Value,
127}
128
129pub use super::pinned_registry::{PinnedRegistryEntry, PinnedRegistryManifest};
130
131#[derive(Debug, Error)]
136#[non_exhaustive]
137pub enum VersionedRegistryError {
138 #[error("not found: {0}")]
139 NotFound(String),
140 #[error("already archived: {kind}/{id}")]
141 Archived { kind: String, id: String },
142 #[error("already exists: {0}")]
143 AlreadyExists(String),
144 #[error("invalid request: {0}")]
145 InvalidRequest(String),
146 #[error(
147 "incompatible schema: {kind}/{id}@{version} stored as schema v{stored} but reader \
148 supports {supported:?}"
149 )]
150 IncompatibleSchema {
151 kind: String,
152 id: String,
153 version: u64,
154 stored: u32,
155 supported: Vec<u32>,
156 },
157 #[error("serialization error: {0}")]
158 Serialization(String),
159 #[error("versioned registry error: {0}")]
160 Backend(String),
161}
162
163impl From<canonical::PinnedRegistryHashError> for VersionedRegistryError {
164 fn from(error: canonical::PinnedRegistryHashError) -> Self {
165 use canonical::PinnedRegistryHashError as Hash;
166 match error {
167 Hash::Serialization(message) => VersionedRegistryError::Serialization(message),
168 Hash::InvalidRequest(message) => VersionedRegistryError::InvalidRequest(message),
169 }
170 }
171}
172
173pub fn canonical_registry_json_bytes(
179 value_schema_version: u32,
180 value: &Value,
181) -> Result<Vec<u8>, VersionedRegistryError> {
182 Ok(canonical::canonical_registry_json_bytes(
183 value_schema_version,
184 value,
185 )?)
186}
187
188#[derive(Debug, Clone, Default)]
192pub struct RegistryRetentionPolicy {
193 pub keep_last_versions: Option<u64>,
197 pub keep_younger_than_ms: Option<u64>,
200 pub protected_versions: Vec<VersionRef>,
204}
205
206#[async_trait]
211pub trait VersionedRegistryRetention: Send + Sync {
212 async fn purge_eligible_versions(
217 &self,
218 scope_id: &str,
219 now_ms: u64,
220 policy: RegistryRetentionPolicy,
221 dry_run: bool,
222 ) -> Result<Vec<VersionRef>, VersionedRegistryError>;
223}
224
225#[must_use]
230pub fn sort_publication_entries(mut entries: Vec<VersionRef>) -> Vec<VersionRef> {
231 entries.sort_by(|a, b| a.kind.cmp(&b.kind).then_with(|| a.id.cmp(&b.id)));
232 entries
233}
234
235pub fn build_rollback_metadata(
241 metadata: Value,
242 to_version: u64,
243) -> Result<Value, VersionedRegistryError> {
244 let mut object = match metadata {
245 Value::Null => serde_json::Map::new(),
246 Value::Object(map) => map,
247 other => {
248 return Err(VersionedRegistryError::InvalidRequest(format!(
249 "rollback metadata must be a JSON object or null, got {}",
250 value_kind_name(&other)
251 )));
252 }
253 };
254 let expected = serde_json::json!(to_version);
255 if let Some(existing) = object.get("restored_from")
256 && existing != &expected
257 {
258 return Err(VersionedRegistryError::InvalidRequest(format!(
259 "rollback metadata.restored_from must be {to_version}, got {existing}"
260 )));
261 }
262 object.insert("restored_from".to_string(), expected);
263 Ok(Value::Object(object))
264}
265
266fn value_kind_name(value: &Value) -> &'static str {
267 match value {
268 Value::Null => "null",
269 Value::Bool(_) => "bool",
270 Value::Number(_) => "number",
271 Value::String(_) => "string",
272 Value::Array(_) => "array",
273 Value::Object(_) => "object",
274 }
275}
276
277pub fn registry_content_hash(
282 value_schema_version: u32,
283 value: &Value,
284) -> Result<(String, Vec<u8>), VersionedRegistryError> {
285 Ok(canonical::registry_content_hash(
286 value_schema_version,
287 value,
288 )?)
289}
290
291#[async_trait]
293pub trait VersionedRegistryStore: Send + Sync {
294 async fn resource_state(
295 &self,
296 scope_id: &str,
297 kind: &str,
298 id: &str,
299 ) -> Result<Option<VersionedResourceState>, VersionedRegistryError>;
300
301 async fn current(
302 &self,
303 scope_id: &str,
304 kind: &str,
305 id: &str,
306 ) -> Result<Option<VersionedRecord<Value>>, VersionedRegistryError>;
307
308 async fn get(
309 &self,
310 scope_id: &str,
311 kind: &str,
312 id: &str,
313 version: u64,
314 ) -> Result<Option<VersionedRecord<Value>>, VersionedRegistryError>;
315
316 async fn list_versions(
317 &self,
318 scope_id: &str,
319 kind: &str,
320 id: &str,
321 ) -> Result<Vec<VersionedRecord<Value>>, VersionedRegistryError>;
322
323 async fn publish_resource(
324 &self,
325 scope_id: &str,
326 kind: &str,
327 id: &str,
328 value: Value,
329 value_schema_version: u32,
330 metadata: Value,
331 ) -> Result<PublishOutcome<Value>, VersionedRegistryError>;
332
333 async fn rollback_resource(
334 &self,
335 scope_id: &str,
336 kind: &str,
337 id: &str,
338 to_version: u64,
339 metadata: Value,
340 ) -> Result<VersionedRecord<Value>, VersionedRegistryError>;
341
342 async fn rollback_publication(
343 &self,
344 scope_id: &str,
345 source_snapshot_version: u64,
346 publication_id: &str,
347 created_by: Option<String>,
348 metadata: Value,
349 ) -> Result<RegistryPublication, VersionedRegistryError> {
350 if publication_id.trim().is_empty() {
351 return Err(VersionedRegistryError::InvalidRequest(
352 "publication_id cannot be empty".to_string(),
353 ));
354 }
355 let source_publication = self
356 .get_publication(scope_id, source_snapshot_version)
357 .await?
358 .ok_or_else(|| {
359 VersionedRegistryError::NotFound(format!(
360 "publication/{scope_id}@{source_snapshot_version}"
361 ))
362 })?;
363
364 let mut resources = Vec::with_capacity(source_publication.entries.len());
365 for entry in &source_publication.entries {
366 let record = self
367 .get(scope_id, &entry.kind, &entry.id, entry.version)
368 .await?
369 .ok_or_else(|| {
370 VersionedRegistryError::NotFound(format!(
371 "{}/{}@{}",
372 entry.kind, entry.id, entry.version
373 ))
374 })?;
375 resources.push(RegistryResourcePublish {
376 kind: entry.kind.clone(),
377 id: entry.id.clone(),
378 value: record.value,
379 value_schema_version: record.value_schema_version,
380 metadata: json!({
381 "rollback_source_publication_id": source_publication.publication_id.clone(),
382 "rollback_source_snapshot_version": source_publication.snapshot_version,
383 "rollback_source_version": entry.version,
384 "rollback_source_content_hash": record.content_hash,
385 }),
386 });
387 }
388
389 self.publish_resources_and_create_publication(
390 scope_id,
391 publication_id,
392 resources,
393 source_publication.source_config_revisions,
394 created_by,
395 metadata,
396 )
397 .await
398 }
399
400 async fn archive_resource(
401 &self,
402 scope_id: &str,
403 kind: &str,
404 id: &str,
405 ) -> Result<(), VersionedRegistryError>;
406
407 async fn unarchive_resource(
408 &self,
409 scope_id: &str,
410 kind: &str,
411 id: &str,
412 ) -> Result<(), VersionedRegistryError>;
413
414 async fn publish_resources_and_create_publication(
415 &self,
416 scope_id: &str,
417 publication_id: &str,
418 resources: Vec<RegistryResourcePublish>,
419 source_config_revisions: Vec<ConfigRevisionRef>,
420 created_by: Option<String>,
421 metadata: Value,
422 ) -> Result<RegistryPublication, VersionedRegistryError>;
423
424 async fn create_publication(
425 &self,
426 scope_id: &str,
427 publication_id: &str,
428 entries: Vec<VersionRef>,
429 source_config_revisions: Vec<ConfigRevisionRef>,
430 created_by: Option<String>,
431 metadata: Value,
432 ) -> Result<RegistryPublication, VersionedRegistryError>;
433
434 async fn latest_publication(
435 &self,
436 scope_id: &str,
437 ) -> Result<Option<RegistryPublication>, VersionedRegistryError>;
438
439 async fn get_publication(
440 &self,
441 scope_id: &str,
442 snapshot_version: u64,
443 ) -> Result<Option<RegistryPublication>, VersionedRegistryError>;
444
445 async fn pinned_manifest_for_publication(
446 &self,
447 scope_id: &str,
448 snapshot_version: u64,
449 ) -> Result<Option<PinnedRegistryManifest>, VersionedRegistryError> {
450 let Some(publication) = self.get_publication(scope_id, snapshot_version).await? else {
451 return Ok(None);
452 };
453 let mut entries = Vec::with_capacity(publication.entries.len());
454 for entry in &publication.entries {
455 let record = self
456 .get(scope_id, &entry.kind, &entry.id, entry.version)
457 .await?
458 .ok_or_else(|| {
459 VersionedRegistryError::NotFound(format!(
460 "{}/{}@{}",
461 entry.kind, entry.id, entry.version
462 ))
463 })?;
464 entries.push(PinnedRegistryEntry {
465 kind: entry.kind.clone(),
466 id: entry.id.clone(),
467 version: entry.version,
468 content_hash: record.content_hash,
469 });
470 }
471 Ok(Some(PinnedRegistryManifest {
472 publication_id: Some(publication.publication_id),
473 registry_snapshot_version: Some(publication.snapshot_version),
474 entries,
475 }))
476 }
477
478 async fn latest_pinned_manifest(
479 &self,
480 scope_id: &str,
481 ) -> Result<Option<PinnedRegistryManifest>, VersionedRegistryError> {
482 let Some(publication) = self.latest_publication(scope_id).await? else {
483 return Ok(None);
484 };
485 self.pinned_manifest_for_publication(scope_id, publication.snapshot_version)
486 .await
487 }
488}
489
490#[derive(Clone)]
497pub struct TypedVersionedRegistry<T> {
498 pub store: Arc<dyn VersionedRegistryStore>,
499 pub scope_id: String,
500 pub kind: String,
501 pub supported_schema_versions: Vec<u32>,
506 pub _phantom: PhantomData<T>,
507}
508
509pub type ScopedVersionedRegistry<T> = TypedVersionedRegistry<T>;
510
511impl<T> TypedVersionedRegistry<T> {
512 #[must_use]
513 pub fn new(
514 store: Arc<dyn VersionedRegistryStore>,
515 scope_id: impl Into<String>,
516 kind: impl Into<String>,
517 ) -> Self {
518 Self {
519 store,
520 scope_id: scope_id.into(),
521 kind: kind.into(),
522 supported_schema_versions: Vec::new(),
523 _phantom: PhantomData,
524 }
525 }
526
527 pub fn try_new(
528 store: Arc<dyn VersionedRegistryStore>,
529 scope_id: impl Into<String>,
530 kind: impl Into<String>,
531 ) -> Result<Self, ScopeError> {
532 let scope_id = ScopeId::new(scope_id.into())?;
533 Ok(Self::new_scoped(store, scope_id, kind))
534 }
535
536 pub fn new_scoped(
537 store: Arc<dyn VersionedRegistryStore>,
538 scope_id: ScopeId,
539 kind: impl Into<String>,
540 ) -> Self {
541 Self {
542 store,
543 scope_id: scope_id.into(),
544 kind: kind.into(),
545 supported_schema_versions: Vec::new(),
546 _phantom: PhantomData,
547 }
548 }
549
550 pub fn scope_id(&self) -> &str {
551 &self.scope_id
552 }
553
554 #[must_use]
559 pub fn with_supported_schema_versions(
560 mut self,
561 versions: impl IntoIterator<Item = u32>,
562 ) -> Self {
563 self.supported_schema_versions = versions.into_iter().collect();
564 self
565 }
566
567 fn check_schema_version(
568 &self,
569 record: &VersionedRecord<Value>,
570 ) -> Result<(), VersionedRegistryError> {
571 if self.supported_schema_versions.is_empty()
572 || self
573 .supported_schema_versions
574 .contains(&record.value_schema_version)
575 {
576 return Ok(());
577 }
578 Err(VersionedRegistryError::IncompatibleSchema {
579 kind: record.kind.clone(),
580 id: record.id.clone(),
581 version: record.version,
582 stored: record.value_schema_version,
583 supported: self.supported_schema_versions.clone(),
584 })
585 }
586
587 #[must_use]
588 pub fn version_ref(&self, id: impl Into<String>, version: u64) -> VersionRef {
589 VersionRef {
590 kind: self.kind.clone(),
591 id: id.into(),
592 version,
593 }
594 }
595}
596
597impl<T> TypedVersionedRegistry<T>
598where
599 T: DeserializeOwned,
600{
601 pub async fn current(
602 &self,
603 id: &str,
604 ) -> Result<Option<VersionedRecord<T>>, VersionedRegistryError> {
605 self.store
606 .current(&self.scope_id, &self.kind, id)
607 .await?
608 .map(|record| {
609 self.check_schema_version(&record)?;
610 decode_record(record)
611 })
612 .transpose()
613 }
614
615 pub async fn get(
616 &self,
617 id: &str,
618 version: u64,
619 ) -> Result<Option<VersionedRecord<T>>, VersionedRegistryError> {
620 self.store
621 .get(&self.scope_id, &self.kind, id, version)
622 .await?
623 .map(|record| {
624 self.check_schema_version(&record)?;
625 decode_record(record)
626 })
627 .transpose()
628 }
629
630 pub async fn list_versions(
631 &self,
632 id: &str,
633 ) -> Result<Vec<VersionedRecord<T>>, VersionedRegistryError> {
634 self.store
635 .list_versions(&self.scope_id, &self.kind, id)
636 .await?
637 .into_iter()
638 .map(|record| {
639 self.check_schema_version(&record)?;
640 decode_record(record)
641 })
642 .collect()
643 }
644
645 pub async fn rollback(
646 &self,
647 id: &str,
648 to_version: u64,
649 metadata: Value,
650 ) -> Result<VersionedRecord<T>, VersionedRegistryError> {
651 let record = self
652 .store
653 .rollback_resource(&self.scope_id, &self.kind, id, to_version, metadata)
654 .await?;
655 self.check_schema_version(&record)?;
656 decode_record(record)
657 }
658}
659
660impl<T> TypedVersionedRegistry<T>
661where
662 T: Serialize + DeserializeOwned,
663{
664 pub async fn publish(
665 &self,
666 id: &str,
667 value: T,
668 value_schema_version: u32,
669 metadata: Value,
670 ) -> Result<PublishOutcome<T>, VersionedRegistryError> {
671 if !self.supported_schema_versions.is_empty()
672 && !self
673 .supported_schema_versions
674 .contains(&value_schema_version)
675 {
676 return Err(VersionedRegistryError::IncompatibleSchema {
677 kind: self.kind.clone(),
678 id: id.to_string(),
679 version: 0,
680 stored: value_schema_version,
681 supported: self.supported_schema_versions.clone(),
682 });
683 }
684 let value = serde_json::to_value(value)
685 .map_err(|error| VersionedRegistryError::Serialization(error.to_string()))?;
686 let outcome = self
687 .store
688 .publish_resource(
689 &self.scope_id,
690 &self.kind,
691 id,
692 value,
693 value_schema_version,
694 metadata,
695 )
696 .await?;
697 decode_publish_outcome(outcome)
698 }
699}
700
701impl<T> TypedVersionedRegistry<T> {
702 pub async fn resource_state(
703 &self,
704 id: &str,
705 ) -> Result<Option<VersionedResourceState>, VersionedRegistryError> {
706 self.store
707 .resource_state(&self.scope_id, &self.kind, id)
708 .await
709 }
710
711 pub async fn archive(&self, id: &str) -> Result<(), VersionedRegistryError> {
712 self.store
713 .archive_resource(&self.scope_id, &self.kind, id)
714 .await
715 }
716
717 pub async fn unarchive(&self, id: &str) -> Result<(), VersionedRegistryError> {
718 self.store
719 .unarchive_resource(&self.scope_id, &self.kind, id)
720 .await
721 }
722}
723
724fn decode_publish_outcome<T>(
725 outcome: PublishOutcome<Value>,
726) -> Result<PublishOutcome<T>, VersionedRegistryError>
727where
728 T: DeserializeOwned,
729{
730 match outcome {
731 PublishOutcome::Created(record) => Ok(PublishOutcome::Created(decode_record(record)?)),
732 PublishOutcome::Noop(record) => Ok(PublishOutcome::Noop(decode_record(record)?)),
733 }
734}
735
736fn decode_record<T>(
737 record: VersionedRecord<Value>,
738) -> Result<VersionedRecord<T>, VersionedRegistryError>
739where
740 T: DeserializeOwned,
741{
742 let value = serde_json::from_value(record.value)
743 .map_err(|error| VersionedRegistryError::Serialization(error.to_string()))?;
744 Ok(VersionedRecord {
745 kind: record.kind,
746 id: record.id,
747 version: record.version,
748 content_hash: record.content_hash,
749 value_schema_version: record.value_schema_version,
750 value,
751 canonical_json_bytes: record.canonical_json_bytes,
752 created_at_ms: record.created_at_ms,
753 metadata: record.metadata,
754 })
755}
756
757#[cfg(test)]
758mod tests;