Skip to main content

awaken_server_contract/contract/
registry_graph.rs

1//! Published registry graph validation contracts and default validator.
2
3use std::collections::{BTreeMap, HashMap, HashSet};
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::future::BoxFuture;
8use serde::{Deserialize, Serialize};
9use thiserror::Error;
10
11use awaken_runtime_contract::registry_spec::{AgentSpec, ModelPoolSpec, ModelSpec};
12
13use super::versioned_registry::{
14    PinnedRegistryEntry, PinnedRegistryManifest, VersionRef, VersionedRecord,
15    VersionedRegistryError, VersionedRegistryStore,
16};
17
18// Pinnable kinds share one source of truth with the manifest builder.
19pub use super::pinned_registry::{
20    REGISTRY_KIND_AGENT, REGISTRY_KIND_MODEL, REGISTRY_KIND_MODEL_POOL, REGISTRY_KIND_PROVIDER,
21};
22
23// Server-only registry kinds (validated/published but not pinned in run manifests).
24pub const REGISTRY_KIND_SKILL: &str = "skill";
25pub const REGISTRY_KIND_TOOL: &str = "tool";
26pub const REGISTRY_KIND_PLUGIN_CONFIG: &str = "plugin_config";
27
28#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
29pub enum VersionSelector {
30    LatestPublication {
31        scope_id: String,
32    },
33    Publication {
34        scope_id: String,
35        snapshot_version: u64,
36    },
37    Exact {
38        scope_id: String,
39        kind: String,
40        id: String,
41        version: u64,
42    },
43    Manifest {
44        scope_id: String,
45        manifest: PinnedRegistryManifest,
46    },
47}
48
49#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq)]
50pub enum RegistryReferencePolicy {
51    #[default]
52    SameScopeOnly,
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
56pub struct RegistryGraphValidationRequest {
57    pub root: VersionSelector,
58    #[serde(default)]
59    pub reference_policy: RegistryReferencePolicy,
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
63pub struct RegistryGraphValidationReport {
64    pub entries: Vec<PinnedRegistryEntry>,
65}
66
67#[derive(Debug, Clone, Error, Serialize, Deserialize, PartialEq, Eq)]
68pub enum RegistryGraphValidationError {
69    #[error("missing registry resource {kind}/{id}")]
70    MissingResource { kind: String, id: String },
71    #[error("missing registry version {kind}/{id}@{version}")]
72    MissingVersion {
73        kind: String,
74        id: String,
75        version: u64,
76    },
77    #[error("archived registry reference {kind}/{id}{version_suffix}", version_suffix = version.map_or(String::new(), |version| format!("@{version}")))]
78    ArchivedReference {
79        kind: String,
80        id: String,
81        version: Option<u64>,
82    },
83    #[error(
84        "content hash mismatch for {kind}/{id}@{version}: expected {expected}, actual {actual}"
85    )]
86    ContentHashMismatch {
87        kind: String,
88        id: String,
89        version: u64,
90        expected: String,
91        actual: String,
92    },
93    #[error("registry graph cycle detected: {path:?}")]
94    CycleDetected { path: Vec<VersionRef> },
95    #[error("invalid registry reference {kind}/{id}: {reason}")]
96    InvalidReference {
97        kind: String,
98        id: String,
99        reason: String,
100    },
101    #[error("registry validation backend error: {0}")]
102    Backend(String),
103}
104
105impl From<VersionedRegistryError> for RegistryGraphValidationError {
106    fn from(error: VersionedRegistryError) -> Self {
107        Self::Backend(error.to_string())
108    }
109}
110
111#[async_trait]
112pub trait RegistryGraphValidator: Send + Sync {
113    async fn validate(
114        &self,
115        request: RegistryGraphValidationRequest,
116    ) -> Result<RegistryGraphValidationReport, RegistryGraphValidationError>;
117}
118
119pub struct StandardRegistryGraphValidator {
120    store: Arc<dyn VersionedRegistryStore>,
121}
122
123impl StandardRegistryGraphValidator {
124    #[must_use]
125    pub fn new(store: Arc<dyn VersionedRegistryStore>) -> Self {
126        Self { store }
127    }
128
129    async fn root_context(
130        &self,
131        root: VersionSelector,
132    ) -> Result<ValidationContext, RegistryGraphValidationError> {
133        match root {
134            VersionSelector::LatestPublication { scope_id } => {
135                let manifest = self
136                    .store
137                    .latest_pinned_manifest(&scope_id)
138                    .await?
139                    .ok_or_else(|| RegistryGraphValidationError::MissingResource {
140                        kind: "publication".to_string(),
141                        id: "latest".to_string(),
142                    })?;
143                ValidationContext::from_manifest(scope_id, manifest, false, true)
144            }
145            VersionSelector::Publication {
146                scope_id,
147                snapshot_version,
148            } => {
149                let manifest = self
150                    .store
151                    .pinned_manifest_for_publication(&scope_id, snapshot_version)
152                    .await?
153                    .ok_or_else(|| RegistryGraphValidationError::MissingVersion {
154                        kind: "publication".to_string(),
155                        id: scope_id.clone(),
156                        version: snapshot_version,
157                    })?;
158                ValidationContext::from_manifest(scope_id, manifest, false, false)
159            }
160            VersionSelector::Manifest { scope_id, manifest } => {
161                ValidationContext::from_manifest(scope_id, manifest, false, false)
162            }
163            VersionSelector::Exact {
164                scope_id,
165                kind,
166                id,
167                version,
168            } => {
169                let record = self.load_record(&scope_id, &kind, &id, version).await?;
170                let entry = PinnedRegistryEntry {
171                    kind,
172                    id,
173                    version,
174                    content_hash: record.content_hash,
175                };
176                ValidationContext::from_entries(scope_id, vec![entry], true, false)
177            }
178        }
179    }
180
181    async fn load_record(
182        &self,
183        scope_id: &str,
184        kind: &str,
185        id: &str,
186        version: u64,
187    ) -> Result<VersionedRecord<serde_json::Value>, RegistryGraphValidationError> {
188        self.store
189            .get(scope_id, kind, id, version)
190            .await?
191            .ok_or_else(|| RegistryGraphValidationError::MissingVersion {
192                kind: kind.to_string(),
193                id: id.to_string(),
194                version,
195            })
196    }
197
198    fn validate_entry<'a>(
199        &'a self,
200        context: &'a mut ValidationContext,
201        entry: PinnedRegistryEntry,
202    ) -> BoxFuture<'a, Result<(), RegistryGraphValidationError>> {
203        Box::pin(async move {
204            let key = ResourceKey::from_entry(&entry);
205            if let Some(existing) = context.accepted.get(&key) {
206                if existing.version == entry.version && existing.content_hash == entry.content_hash
207                {
208                    return Ok(());
209                }
210                return Err(RegistryGraphValidationError::InvalidReference {
211                    kind: entry.kind,
212                    id: entry.id,
213                    reason: "conflicting versions for the same resource".to_string(),
214                });
215            }
216
217            if let Some(position) = context.visiting.iter().position(|visited| {
218                visited.kind == entry.kind
219                    && visited.id == entry.id
220                    && visited.version == entry.version
221            }) {
222                let mut path = context.visiting[position..].to_vec();
223                path.push(VersionRef {
224                    kind: entry.kind,
225                    id: entry.id,
226                    version: entry.version,
227                });
228                return Err(RegistryGraphValidationError::CycleDetected { path });
229            }
230
231            let record = self
232                .validate_stored_entry(
233                    &context.scope_id,
234                    &entry,
235                    context.reject_archived_explicit_entries,
236                )
237                .await?;
238            context.visiting.push(VersionRef {
239                kind: entry.kind.clone(),
240                id: entry.id.clone(),
241                version: entry.version,
242            });
243            let dependencies = self
244                .dependencies_for_record(context, &entry, &record)
245                .await?;
246            for dependency in dependencies {
247                self.validate_entry(context, dependency).await?;
248            }
249            context.visiting.pop();
250            context.accepted.insert(key, entry);
251            Ok(())
252        })
253    }
254
255    async fn validate_stored_entry(
256        &self,
257        scope_id: &str,
258        entry: &PinnedRegistryEntry,
259        reject_archived: bool,
260    ) -> Result<VersionedRecord<serde_json::Value>, RegistryGraphValidationError> {
261        if reject_archived {
262            self.reject_archived(scope_id, &entry.kind, &entry.id, Some(entry.version))
263                .await?;
264        }
265        let record = self
266            .load_record(scope_id, &entry.kind, &entry.id, entry.version)
267            .await?;
268        // ADR-0035 D3/D9: re-derive the SHA-256 from canonical_json_bytes
269        // and compare to the stored content_hash before trusting either
270        // column. Without this the manifest hash becomes decorative.
271        record
272            .verify_content_hash()
273            .map_err(|error| RegistryGraphValidationError::Backend(error.to_string()))?;
274        if record.content_hash != entry.content_hash {
275            return Err(RegistryGraphValidationError::ContentHashMismatch {
276                kind: entry.kind.clone(),
277                id: entry.id.clone(),
278                version: entry.version,
279                expected: entry.content_hash.clone(),
280                actual: record.content_hash,
281            });
282        }
283        Ok(record)
284    }
285
286    async fn reject_archived(
287        &self,
288        scope_id: &str,
289        kind: &str,
290        id: &str,
291        version: Option<u64>,
292    ) -> Result<(), RegistryGraphValidationError> {
293        let state = self.store.resource_state(scope_id, kind, id).await?;
294        let state = state.ok_or_else(|| RegistryGraphValidationError::MissingResource {
295            kind: kind.to_string(),
296            id: id.to_string(),
297        })?;
298        if state.archived_at_ms.is_some() {
299            return Err(RegistryGraphValidationError::ArchivedReference {
300                kind: kind.to_string(),
301                id: id.to_string(),
302                version,
303            });
304        }
305        Ok(())
306    }
307
308    async fn dependencies_for_record(
309        &self,
310        context: &ValidationContext,
311        entry: &PinnedRegistryEntry,
312        record: &VersionedRecord<serde_json::Value>,
313    ) -> Result<Vec<PinnedRegistryEntry>, RegistryGraphValidationError> {
314        match entry.kind.as_str() {
315            REGISTRY_KIND_AGENT => self.agent_dependencies(context, entry, record).await,
316            REGISTRY_KIND_MODEL => self.model_dependencies(context, entry, record).await,
317            REGISTRY_KIND_MODEL_POOL => self.model_pool_dependencies(context, entry, record).await,
318            REGISTRY_KIND_PROVIDER
319            | REGISTRY_KIND_SKILL
320            | REGISTRY_KIND_TOOL
321            | REGISTRY_KIND_PLUGIN_CONFIG => Ok(Vec::new()),
322            _ => Err(RegistryGraphValidationError::InvalidReference {
323                kind: entry.kind.clone(),
324                id: entry.id.clone(),
325                reason: "unsupported registry kind".to_string(),
326            }),
327        }
328    }
329
330    async fn agent_dependencies(
331        &self,
332        context: &ValidationContext,
333        entry: &PinnedRegistryEntry,
334        record: &VersionedRecord<serde_json::Value>,
335    ) -> Result<Vec<PinnedRegistryEntry>, RegistryGraphValidationError> {
336        let spec = serde_json::from_value::<AgentSpec>(record.value.clone()).map_err(|error| {
337            RegistryGraphValidationError::InvalidReference {
338                kind: entry.kind.clone(),
339                id: entry.id.clone(),
340                reason: format!("invalid AgentSpec: {error}"),
341            }
342        })?;
343        if spec.id != entry.id {
344            return Err(RegistryGraphValidationError::InvalidReference {
345                kind: entry.kind.clone(),
346                id: entry.id.clone(),
347                reason: format!("AgentSpec.id {} does not match registry id", spec.id),
348            });
349        }
350
351        let mut dependencies = Vec::new();
352        if spec.endpoint.is_none() {
353            dependencies.push(
354                self.resolve_model_or_pool_reference(context, &spec.model_id)
355                    .await?,
356            );
357        }
358        for delegate_id in &spec.delegates {
359            dependencies.push(
360                self.resolve_reference_entry(context, REGISTRY_KIND_AGENT, delegate_id)
361                    .await?,
362            );
363        }
364        Ok(dependencies)
365    }
366
367    async fn model_dependencies(
368        &self,
369        context: &ValidationContext,
370        entry: &PinnedRegistryEntry,
371        record: &VersionedRecord<serde_json::Value>,
372    ) -> Result<Vec<PinnedRegistryEntry>, RegistryGraphValidationError> {
373        let spec = serde_json::from_value::<ModelSpec>(record.value.clone()).map_err(|error| {
374            RegistryGraphValidationError::InvalidReference {
375                kind: entry.kind.clone(),
376                id: entry.id.clone(),
377                reason: format!("invalid ModelSpec: {error}"),
378            }
379        })?;
380        if spec.id != entry.id {
381            return Err(RegistryGraphValidationError::InvalidReference {
382                kind: entry.kind.clone(),
383                id: entry.id.clone(),
384                reason: format!("ModelSpec.id {} does not match registry id", spec.id),
385            });
386        }
387        Ok(vec![
388            self.resolve_reference_entry(context, REGISTRY_KIND_PROVIDER, &spec.provider_id)
389                .await?,
390        ])
391    }
392
393    async fn model_pool_dependencies(
394        &self,
395        context: &ValidationContext,
396        entry: &PinnedRegistryEntry,
397        record: &VersionedRecord<serde_json::Value>,
398    ) -> Result<Vec<PinnedRegistryEntry>, RegistryGraphValidationError> {
399        let spec =
400            serde_json::from_value::<ModelPoolSpec>(record.value.clone()).map_err(|error| {
401                RegistryGraphValidationError::InvalidReference {
402                    kind: entry.kind.clone(),
403                    id: entry.id.clone(),
404                    reason: format!("invalid ModelPoolSpec: {error}"),
405                }
406            })?;
407        if spec.id != entry.id {
408            return Err(RegistryGraphValidationError::InvalidReference {
409                kind: entry.kind.clone(),
410                id: entry.id.clone(),
411                reason: format!("ModelPoolSpec.id {} does not match registry id", spec.id),
412            });
413        }
414        let mut dependencies = Vec::with_capacity(spec.members.len());
415        for member in &spec.members {
416            dependencies.push(
417                self.resolve_reference_entry(context, REGISTRY_KIND_MODEL, &member.model_id)
418                    .await?,
419            );
420        }
421        Ok(dependencies)
422    }
423
424    /// Resolve an agent's `model_id`, which may name either a single model or a
425    /// pool (pools share the model id namespace). An id that resolves to *both*
426    /// a model and a pool is ambiguous and rejected, matching the runtime
427    /// resolver's `AmbiguousModelReference`; resolving to exactly one is
428    /// returned; resolving to neither reports a missing model.
429    async fn resolve_model_or_pool_reference(
430        &self,
431        context: &ValidationContext,
432        id: &str,
433    ) -> Result<PinnedRegistryEntry, RegistryGraphValidationError> {
434        let model = self
435            .try_reference_entry(context, REGISTRY_KIND_MODEL, id)
436            .await?;
437        let pool = self
438            .try_reference_entry(context, REGISTRY_KIND_MODEL_POOL, id)
439            .await?;
440        match (model, pool) {
441            (Some(_), Some(_)) => Err(RegistryGraphValidationError::InvalidReference {
442                kind: REGISTRY_KIND_MODEL.to_string(),
443                id: id.to_string(),
444                reason: "id resolves to both a model and a model pool".to_string(),
445            }),
446            (Some(entry), None) | (None, Some(entry)) => Ok(entry),
447            (None, None) => Err(RegistryGraphValidationError::MissingResource {
448                kind: REGISTRY_KIND_MODEL.to_string(),
449                id: id.to_string(),
450            }),
451        }
452    }
453
454    /// Like [`resolve_reference_entry`](Self::resolve_reference_entry) but
455    /// returns `Ok(None)` when the resource is absent instead of an error, so a
456    /// caller can try an alternative kind. Other failures (archived, store)
457    /// still propagate.
458    async fn try_reference_entry(
459        &self,
460        context: &ValidationContext,
461        kind: &str,
462        id: &str,
463    ) -> Result<Option<PinnedRegistryEntry>, RegistryGraphValidationError> {
464        let key = ResourceKey::new(kind, id);
465        if let Some(entry) = context.candidate_entries.get(&key) {
466            return Ok(Some(entry.clone()));
467        }
468        if !context.allow_current_reference_resolution {
469            return Ok(None);
470        }
471        // Honor the soft contract: a resource that simply does not exist for
472        // this kind is `Ok(None)` so the caller can try the other kind (an id
473        // may name a model OR a pool). `reject_archived` reports absence as
474        // `MissingResource`, so probe existence first and only run the archived
475        // check when the resource is actually present — an archived resource
476        // still propagates as an error.
477        if self
478            .store
479            .resource_state(&context.scope_id, kind, id)
480            .await?
481            .is_none()
482        {
483            return Ok(None);
484        }
485        self.reject_archived(&context.scope_id, kind, id, None)
486            .await?;
487        let Some(record) = self.store.current(&context.scope_id, kind, id).await? else {
488            return Ok(None);
489        };
490        Ok(Some(PinnedRegistryEntry {
491            kind: kind.to_string(),
492            id: id.to_string(),
493            version: record.version,
494            content_hash: record.content_hash,
495        }))
496    }
497
498    async fn resolve_reference_entry(
499        &self,
500        context: &ValidationContext,
501        kind: &str,
502        id: &str,
503    ) -> Result<PinnedRegistryEntry, RegistryGraphValidationError> {
504        let key = ResourceKey::new(kind, id);
505        if let Some(entry) = context.candidate_entries.get(&key) {
506            return Ok(entry.clone());
507        }
508        // ADR-0035 D9: pinned-manifest/publication validation must fail
509        // closed when a transitive reference is absent. Only `Exact` opts
510        // into expanding through the store's current pointer, and its
511        // output is frozen into a manifest before execution begins.
512        if !context.allow_current_reference_resolution {
513            return Err(RegistryGraphValidationError::MissingResource {
514                kind: kind.to_string(),
515                id: id.to_string(),
516            });
517        }
518        self.reject_archived(&context.scope_id, kind, id, None)
519            .await?;
520        let record = self
521            .store
522            .current(&context.scope_id, kind, id)
523            .await?
524            .ok_or_else(|| RegistryGraphValidationError::MissingResource {
525                kind: kind.to_string(),
526                id: id.to_string(),
527            })?;
528        Ok(PinnedRegistryEntry {
529            kind: kind.to_string(),
530            id: id.to_string(),
531            version: record.version,
532            content_hash: record.content_hash,
533        })
534    }
535}
536
537#[async_trait]
538impl RegistryGraphValidator for StandardRegistryGraphValidator {
539    async fn validate(
540        &self,
541        request: RegistryGraphValidationRequest,
542    ) -> Result<RegistryGraphValidationReport, RegistryGraphValidationError> {
543        match request.reference_policy {
544            RegistryReferencePolicy::SameScopeOnly => {}
545        }
546        let mut context = self.root_context(request.root).await?;
547        let roots = context.root_entries.clone();
548        for entry in roots {
549            self.validate_entry(&mut context, entry).await?;
550        }
551        Ok(context.into_report())
552    }
553}
554
555#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
556struct ResourceKey {
557    kind: String,
558    id: String,
559}
560
561impl ResourceKey {
562    fn new(kind: &str, id: &str) -> Self {
563        Self {
564            kind: kind.to_string(),
565            id: id.to_string(),
566        }
567    }
568
569    fn from_entry(entry: &PinnedRegistryEntry) -> Self {
570        Self::new(&entry.kind, &entry.id)
571    }
572}
573
574struct ValidationContext {
575    scope_id: String,
576    root_entries: Vec<PinnedRegistryEntry>,
577    candidate_entries: HashMap<ResourceKey, PinnedRegistryEntry>,
578    accepted: BTreeMap<ResourceKey, PinnedRegistryEntry>,
579    visiting: Vec<VersionRef>,
580    /// Allow the validator to resolve missing references against the
581    /// store's current pointer. ADR-0035 D9 forbids this for run-time
582    /// resume materialization (Manifest, Publication, LatestPublication
583    /// must observe only the frozen entries). It is only enabled for
584    /// `Exact`, where the caller asked to expand a single root into its
585    /// reachable graph and the resulting `report.entries` will be saved
586    /// as the persisted manifest before run execution starts.
587    allow_current_reference_resolution: bool,
588    reject_archived_explicit_entries: bool,
589}
590
591impl ValidationContext {
592    fn from_manifest(
593        scope_id: String,
594        manifest: PinnedRegistryManifest,
595        allow_current_reference_resolution: bool,
596        reject_archived_explicit_entries: bool,
597    ) -> Result<Self, RegistryGraphValidationError> {
598        Self::from_entries(
599            scope_id,
600            manifest.entries,
601            allow_current_reference_resolution,
602            reject_archived_explicit_entries,
603        )
604    }
605
606    fn from_entries(
607        scope_id: String,
608        entries: Vec<PinnedRegistryEntry>,
609        allow_current_reference_resolution: bool,
610        reject_archived_explicit_entries: bool,
611    ) -> Result<Self, RegistryGraphValidationError> {
612        let mut candidate_entries = HashMap::new();
613        let mut seen = HashSet::new();
614        for entry in &entries {
615            if entry.version == 0 {
616                return Err(RegistryGraphValidationError::InvalidReference {
617                    kind: entry.kind.clone(),
618                    id: entry.id.clone(),
619                    reason: "version cannot be 0".to_string(),
620                });
621            }
622            let key = ResourceKey::from_entry(entry);
623            if !seen.insert(key.clone()) {
624                return Err(RegistryGraphValidationError::InvalidReference {
625                    kind: entry.kind.clone(),
626                    id: entry.id.clone(),
627                    reason: "duplicate manifest entry".to_string(),
628                });
629            }
630            candidate_entries.insert(key, entry.clone());
631        }
632        Ok(Self {
633            scope_id,
634            root_entries: entries,
635            candidate_entries,
636            accepted: BTreeMap::new(),
637            visiting: Vec::new(),
638            allow_current_reference_resolution,
639            reject_archived_explicit_entries,
640        })
641    }
642
643    fn into_report(self) -> RegistryGraphValidationReport {
644        RegistryGraphValidationReport {
645            entries: self.accepted.into_values().collect(),
646        }
647    }
648}