1use std::collections::{BTreeMap, HashMap, HashSet};
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::future::BoxFuture;
8use serde::{Deserialize, Serialize};
9use thiserror::Error;
10
11use awaken_runtime_contract::registry_spec::{AgentSpec, ModelPoolSpec, ModelSpec};
12
13use super::versioned_registry::{
14 PinnedRegistryEntry, PinnedRegistryManifest, VersionRef, VersionedRecord,
15 VersionedRegistryError, VersionedRegistryStore,
16};
17
18pub use super::pinned_registry::{
20 REGISTRY_KIND_AGENT, REGISTRY_KIND_MODEL, REGISTRY_KIND_MODEL_POOL, REGISTRY_KIND_PROVIDER,
21};
22
23pub const REGISTRY_KIND_SKILL: &str = "skill";
25pub const REGISTRY_KIND_TOOL: &str = "tool";
26pub const REGISTRY_KIND_PLUGIN_CONFIG: &str = "plugin_config";
27
28#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
29pub enum VersionSelector {
30 LatestPublication {
31 scope_id: String,
32 },
33 Publication {
34 scope_id: String,
35 snapshot_version: u64,
36 },
37 Exact {
38 scope_id: String,
39 kind: String,
40 id: String,
41 version: u64,
42 },
43 Manifest {
44 scope_id: String,
45 manifest: PinnedRegistryManifest,
46 },
47}
48
49#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq)]
50pub enum RegistryReferencePolicy {
51 #[default]
52 SameScopeOnly,
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
56pub struct RegistryGraphValidationRequest {
57 pub root: VersionSelector,
58 #[serde(default)]
59 pub reference_policy: RegistryReferencePolicy,
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
63pub struct RegistryGraphValidationReport {
64 pub entries: Vec<PinnedRegistryEntry>,
65}
66
67#[derive(Debug, Clone, Error, Serialize, Deserialize, PartialEq, Eq)]
68pub enum RegistryGraphValidationError {
69 #[error("missing registry resource {kind}/{id}")]
70 MissingResource { kind: String, id: String },
71 #[error("missing registry version {kind}/{id}@{version}")]
72 MissingVersion {
73 kind: String,
74 id: String,
75 version: u64,
76 },
77 #[error("archived registry reference {kind}/{id}{version_suffix}", version_suffix = version.map_or(String::new(), |version| format!("@{version}")))]
78 ArchivedReference {
79 kind: String,
80 id: String,
81 version: Option<u64>,
82 },
83 #[error(
84 "content hash mismatch for {kind}/{id}@{version}: expected {expected}, actual {actual}"
85 )]
86 ContentHashMismatch {
87 kind: String,
88 id: String,
89 version: u64,
90 expected: String,
91 actual: String,
92 },
93 #[error("registry graph cycle detected: {path:?}")]
94 CycleDetected { path: Vec<VersionRef> },
95 #[error("invalid registry reference {kind}/{id}: {reason}")]
96 InvalidReference {
97 kind: String,
98 id: String,
99 reason: String,
100 },
101 #[error("registry validation backend error: {0}")]
102 Backend(String),
103}
104
105impl From<VersionedRegistryError> for RegistryGraphValidationError {
106 fn from(error: VersionedRegistryError) -> Self {
107 Self::Backend(error.to_string())
108 }
109}
110
111#[async_trait]
112pub trait RegistryGraphValidator: Send + Sync {
113 async fn validate(
114 &self,
115 request: RegistryGraphValidationRequest,
116 ) -> Result<RegistryGraphValidationReport, RegistryGraphValidationError>;
117}
118
119pub struct StandardRegistryGraphValidator {
120 store: Arc<dyn VersionedRegistryStore>,
121}
122
123impl StandardRegistryGraphValidator {
124 #[must_use]
125 pub fn new(store: Arc<dyn VersionedRegistryStore>) -> Self {
126 Self { store }
127 }
128
129 async fn root_context(
130 &self,
131 root: VersionSelector,
132 ) -> Result<ValidationContext, RegistryGraphValidationError> {
133 match root {
134 VersionSelector::LatestPublication { scope_id } => {
135 let manifest = self
136 .store
137 .latest_pinned_manifest(&scope_id)
138 .await?
139 .ok_or_else(|| RegistryGraphValidationError::MissingResource {
140 kind: "publication".to_string(),
141 id: "latest".to_string(),
142 })?;
143 ValidationContext::from_manifest(scope_id, manifest, false, true)
144 }
145 VersionSelector::Publication {
146 scope_id,
147 snapshot_version,
148 } => {
149 let manifest = self
150 .store
151 .pinned_manifest_for_publication(&scope_id, snapshot_version)
152 .await?
153 .ok_or_else(|| RegistryGraphValidationError::MissingVersion {
154 kind: "publication".to_string(),
155 id: scope_id.clone(),
156 version: snapshot_version,
157 })?;
158 ValidationContext::from_manifest(scope_id, manifest, false, false)
159 }
160 VersionSelector::Manifest { scope_id, manifest } => {
161 ValidationContext::from_manifest(scope_id, manifest, false, false)
162 }
163 VersionSelector::Exact {
164 scope_id,
165 kind,
166 id,
167 version,
168 } => {
169 let record = self.load_record(&scope_id, &kind, &id, version).await?;
170 let entry = PinnedRegistryEntry {
171 kind,
172 id,
173 version,
174 content_hash: record.content_hash,
175 };
176 ValidationContext::from_entries(scope_id, vec![entry], true, false)
177 }
178 }
179 }
180
181 async fn load_record(
182 &self,
183 scope_id: &str,
184 kind: &str,
185 id: &str,
186 version: u64,
187 ) -> Result<VersionedRecord<serde_json::Value>, RegistryGraphValidationError> {
188 self.store
189 .get(scope_id, kind, id, version)
190 .await?
191 .ok_or_else(|| RegistryGraphValidationError::MissingVersion {
192 kind: kind.to_string(),
193 id: id.to_string(),
194 version,
195 })
196 }
197
198 fn validate_entry<'a>(
199 &'a self,
200 context: &'a mut ValidationContext,
201 entry: PinnedRegistryEntry,
202 ) -> BoxFuture<'a, Result<(), RegistryGraphValidationError>> {
203 Box::pin(async move {
204 let key = ResourceKey::from_entry(&entry);
205 if let Some(existing) = context.accepted.get(&key) {
206 if existing.version == entry.version && existing.content_hash == entry.content_hash
207 {
208 return Ok(());
209 }
210 return Err(RegistryGraphValidationError::InvalidReference {
211 kind: entry.kind,
212 id: entry.id,
213 reason: "conflicting versions for the same resource".to_string(),
214 });
215 }
216
217 if let Some(position) = context.visiting.iter().position(|visited| {
218 visited.kind == entry.kind
219 && visited.id == entry.id
220 && visited.version == entry.version
221 }) {
222 let mut path = context.visiting[position..].to_vec();
223 path.push(VersionRef {
224 kind: entry.kind,
225 id: entry.id,
226 version: entry.version,
227 });
228 return Err(RegistryGraphValidationError::CycleDetected { path });
229 }
230
231 let record = self
232 .validate_stored_entry(
233 &context.scope_id,
234 &entry,
235 context.reject_archived_explicit_entries,
236 )
237 .await?;
238 context.visiting.push(VersionRef {
239 kind: entry.kind.clone(),
240 id: entry.id.clone(),
241 version: entry.version,
242 });
243 let dependencies = self
244 .dependencies_for_record(context, &entry, &record)
245 .await?;
246 for dependency in dependencies {
247 self.validate_entry(context, dependency).await?;
248 }
249 context.visiting.pop();
250 context.accepted.insert(key, entry);
251 Ok(())
252 })
253 }
254
255 async fn validate_stored_entry(
256 &self,
257 scope_id: &str,
258 entry: &PinnedRegistryEntry,
259 reject_archived: bool,
260 ) -> Result<VersionedRecord<serde_json::Value>, RegistryGraphValidationError> {
261 if reject_archived {
262 self.reject_archived(scope_id, &entry.kind, &entry.id, Some(entry.version))
263 .await?;
264 }
265 let record = self
266 .load_record(scope_id, &entry.kind, &entry.id, entry.version)
267 .await?;
268 record
272 .verify_content_hash()
273 .map_err(|error| RegistryGraphValidationError::Backend(error.to_string()))?;
274 if record.content_hash != entry.content_hash {
275 return Err(RegistryGraphValidationError::ContentHashMismatch {
276 kind: entry.kind.clone(),
277 id: entry.id.clone(),
278 version: entry.version,
279 expected: entry.content_hash.clone(),
280 actual: record.content_hash,
281 });
282 }
283 Ok(record)
284 }
285
286 async fn reject_archived(
287 &self,
288 scope_id: &str,
289 kind: &str,
290 id: &str,
291 version: Option<u64>,
292 ) -> Result<(), RegistryGraphValidationError> {
293 let state = self.store.resource_state(scope_id, kind, id).await?;
294 let state = state.ok_or_else(|| RegistryGraphValidationError::MissingResource {
295 kind: kind.to_string(),
296 id: id.to_string(),
297 })?;
298 if state.archived_at_ms.is_some() {
299 return Err(RegistryGraphValidationError::ArchivedReference {
300 kind: kind.to_string(),
301 id: id.to_string(),
302 version,
303 });
304 }
305 Ok(())
306 }
307
308 async fn dependencies_for_record(
309 &self,
310 context: &ValidationContext,
311 entry: &PinnedRegistryEntry,
312 record: &VersionedRecord<serde_json::Value>,
313 ) -> Result<Vec<PinnedRegistryEntry>, RegistryGraphValidationError> {
314 match entry.kind.as_str() {
315 REGISTRY_KIND_AGENT => self.agent_dependencies(context, entry, record).await,
316 REGISTRY_KIND_MODEL => self.model_dependencies(context, entry, record).await,
317 REGISTRY_KIND_MODEL_POOL => self.model_pool_dependencies(context, entry, record).await,
318 REGISTRY_KIND_PROVIDER
319 | REGISTRY_KIND_SKILL
320 | REGISTRY_KIND_TOOL
321 | REGISTRY_KIND_PLUGIN_CONFIG => Ok(Vec::new()),
322 _ => Err(RegistryGraphValidationError::InvalidReference {
323 kind: entry.kind.clone(),
324 id: entry.id.clone(),
325 reason: "unsupported registry kind".to_string(),
326 }),
327 }
328 }
329
330 async fn agent_dependencies(
331 &self,
332 context: &ValidationContext,
333 entry: &PinnedRegistryEntry,
334 record: &VersionedRecord<serde_json::Value>,
335 ) -> Result<Vec<PinnedRegistryEntry>, RegistryGraphValidationError> {
336 let spec = serde_json::from_value::<AgentSpec>(record.value.clone()).map_err(|error| {
337 RegistryGraphValidationError::InvalidReference {
338 kind: entry.kind.clone(),
339 id: entry.id.clone(),
340 reason: format!("invalid AgentSpec: {error}"),
341 }
342 })?;
343 if spec.id != entry.id {
344 return Err(RegistryGraphValidationError::InvalidReference {
345 kind: entry.kind.clone(),
346 id: entry.id.clone(),
347 reason: format!("AgentSpec.id {} does not match registry id", spec.id),
348 });
349 }
350
351 let mut dependencies = Vec::new();
352 if spec.endpoint.is_none() {
353 dependencies.push(
354 self.resolve_model_or_pool_reference(context, &spec.model_id)
355 .await?,
356 );
357 }
358 for delegate_id in &spec.delegates {
359 dependencies.push(
360 self.resolve_reference_entry(context, REGISTRY_KIND_AGENT, delegate_id)
361 .await?,
362 );
363 }
364 Ok(dependencies)
365 }
366
367 async fn model_dependencies(
368 &self,
369 context: &ValidationContext,
370 entry: &PinnedRegistryEntry,
371 record: &VersionedRecord<serde_json::Value>,
372 ) -> Result<Vec<PinnedRegistryEntry>, RegistryGraphValidationError> {
373 let spec = serde_json::from_value::<ModelSpec>(record.value.clone()).map_err(|error| {
374 RegistryGraphValidationError::InvalidReference {
375 kind: entry.kind.clone(),
376 id: entry.id.clone(),
377 reason: format!("invalid ModelSpec: {error}"),
378 }
379 })?;
380 if spec.id != entry.id {
381 return Err(RegistryGraphValidationError::InvalidReference {
382 kind: entry.kind.clone(),
383 id: entry.id.clone(),
384 reason: format!("ModelSpec.id {} does not match registry id", spec.id),
385 });
386 }
387 Ok(vec![
388 self.resolve_reference_entry(context, REGISTRY_KIND_PROVIDER, &spec.provider_id)
389 .await?,
390 ])
391 }
392
393 async fn model_pool_dependencies(
394 &self,
395 context: &ValidationContext,
396 entry: &PinnedRegistryEntry,
397 record: &VersionedRecord<serde_json::Value>,
398 ) -> Result<Vec<PinnedRegistryEntry>, RegistryGraphValidationError> {
399 let spec =
400 serde_json::from_value::<ModelPoolSpec>(record.value.clone()).map_err(|error| {
401 RegistryGraphValidationError::InvalidReference {
402 kind: entry.kind.clone(),
403 id: entry.id.clone(),
404 reason: format!("invalid ModelPoolSpec: {error}"),
405 }
406 })?;
407 if spec.id != entry.id {
408 return Err(RegistryGraphValidationError::InvalidReference {
409 kind: entry.kind.clone(),
410 id: entry.id.clone(),
411 reason: format!("ModelPoolSpec.id {} does not match registry id", spec.id),
412 });
413 }
414 let mut dependencies = Vec::with_capacity(spec.members.len());
415 for member in &spec.members {
416 dependencies.push(
417 self.resolve_reference_entry(context, REGISTRY_KIND_MODEL, &member.model_id)
418 .await?,
419 );
420 }
421 Ok(dependencies)
422 }
423
424 async fn resolve_model_or_pool_reference(
430 &self,
431 context: &ValidationContext,
432 id: &str,
433 ) -> Result<PinnedRegistryEntry, RegistryGraphValidationError> {
434 let model = self
435 .try_reference_entry(context, REGISTRY_KIND_MODEL, id)
436 .await?;
437 let pool = self
438 .try_reference_entry(context, REGISTRY_KIND_MODEL_POOL, id)
439 .await?;
440 match (model, pool) {
441 (Some(_), Some(_)) => Err(RegistryGraphValidationError::InvalidReference {
442 kind: REGISTRY_KIND_MODEL.to_string(),
443 id: id.to_string(),
444 reason: "id resolves to both a model and a model pool".to_string(),
445 }),
446 (Some(entry), None) | (None, Some(entry)) => Ok(entry),
447 (None, None) => Err(RegistryGraphValidationError::MissingResource {
448 kind: REGISTRY_KIND_MODEL.to_string(),
449 id: id.to_string(),
450 }),
451 }
452 }
453
454 async fn try_reference_entry(
459 &self,
460 context: &ValidationContext,
461 kind: &str,
462 id: &str,
463 ) -> Result<Option<PinnedRegistryEntry>, RegistryGraphValidationError> {
464 let key = ResourceKey::new(kind, id);
465 if let Some(entry) = context.candidate_entries.get(&key) {
466 return Ok(Some(entry.clone()));
467 }
468 if !context.allow_current_reference_resolution {
469 return Ok(None);
470 }
471 if self
478 .store
479 .resource_state(&context.scope_id, kind, id)
480 .await?
481 .is_none()
482 {
483 return Ok(None);
484 }
485 self.reject_archived(&context.scope_id, kind, id, None)
486 .await?;
487 let Some(record) = self.store.current(&context.scope_id, kind, id).await? else {
488 return Ok(None);
489 };
490 Ok(Some(PinnedRegistryEntry {
491 kind: kind.to_string(),
492 id: id.to_string(),
493 version: record.version,
494 content_hash: record.content_hash,
495 }))
496 }
497
498 async fn resolve_reference_entry(
499 &self,
500 context: &ValidationContext,
501 kind: &str,
502 id: &str,
503 ) -> Result<PinnedRegistryEntry, RegistryGraphValidationError> {
504 let key = ResourceKey::new(kind, id);
505 if let Some(entry) = context.candidate_entries.get(&key) {
506 return Ok(entry.clone());
507 }
508 if !context.allow_current_reference_resolution {
513 return Err(RegistryGraphValidationError::MissingResource {
514 kind: kind.to_string(),
515 id: id.to_string(),
516 });
517 }
518 self.reject_archived(&context.scope_id, kind, id, None)
519 .await?;
520 let record = self
521 .store
522 .current(&context.scope_id, kind, id)
523 .await?
524 .ok_or_else(|| RegistryGraphValidationError::MissingResource {
525 kind: kind.to_string(),
526 id: id.to_string(),
527 })?;
528 Ok(PinnedRegistryEntry {
529 kind: kind.to_string(),
530 id: id.to_string(),
531 version: record.version,
532 content_hash: record.content_hash,
533 })
534 }
535}
536
537#[async_trait]
538impl RegistryGraphValidator for StandardRegistryGraphValidator {
539 async fn validate(
540 &self,
541 request: RegistryGraphValidationRequest,
542 ) -> Result<RegistryGraphValidationReport, RegistryGraphValidationError> {
543 match request.reference_policy {
544 RegistryReferencePolicy::SameScopeOnly => {}
545 }
546 let mut context = self.root_context(request.root).await?;
547 let roots = context.root_entries.clone();
548 for entry in roots {
549 self.validate_entry(&mut context, entry).await?;
550 }
551 Ok(context.into_report())
552 }
553}
554
555#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
556struct ResourceKey {
557 kind: String,
558 id: String,
559}
560
561impl ResourceKey {
562 fn new(kind: &str, id: &str) -> Self {
563 Self {
564 kind: kind.to_string(),
565 id: id.to_string(),
566 }
567 }
568
569 fn from_entry(entry: &PinnedRegistryEntry) -> Self {
570 Self::new(&entry.kind, &entry.id)
571 }
572}
573
574struct ValidationContext {
575 scope_id: String,
576 root_entries: Vec<PinnedRegistryEntry>,
577 candidate_entries: HashMap<ResourceKey, PinnedRegistryEntry>,
578 accepted: BTreeMap<ResourceKey, PinnedRegistryEntry>,
579 visiting: Vec<VersionRef>,
580 allow_current_reference_resolution: bool,
588 reject_archived_explicit_entries: bool,
589}
590
591impl ValidationContext {
592 fn from_manifest(
593 scope_id: String,
594 manifest: PinnedRegistryManifest,
595 allow_current_reference_resolution: bool,
596 reject_archived_explicit_entries: bool,
597 ) -> Result<Self, RegistryGraphValidationError> {
598 Self::from_entries(
599 scope_id,
600 manifest.entries,
601 allow_current_reference_resolution,
602 reject_archived_explicit_entries,
603 )
604 }
605
606 fn from_entries(
607 scope_id: String,
608 entries: Vec<PinnedRegistryEntry>,
609 allow_current_reference_resolution: bool,
610 reject_archived_explicit_entries: bool,
611 ) -> Result<Self, RegistryGraphValidationError> {
612 let mut candidate_entries = HashMap::new();
613 let mut seen = HashSet::new();
614 for entry in &entries {
615 if entry.version == 0 {
616 return Err(RegistryGraphValidationError::InvalidReference {
617 kind: entry.kind.clone(),
618 id: entry.id.clone(),
619 reason: "version cannot be 0".to_string(),
620 });
621 }
622 let key = ResourceKey::from_entry(entry);
623 if !seen.insert(key.clone()) {
624 return Err(RegistryGraphValidationError::InvalidReference {
625 kind: entry.kind.clone(),
626 id: entry.id.clone(),
627 reason: "duplicate manifest entry".to_string(),
628 });
629 }
630 candidate_entries.insert(key, entry.clone());
631 }
632 Ok(Self {
633 scope_id,
634 root_entries: entries,
635 candidate_entries,
636 accepted: BTreeMap::new(),
637 visiting: Vec::new(),
638 allow_current_reference_resolution,
639 reject_archived_explicit_entries,
640 })
641 }
642
643 fn into_report(self) -> RegistryGraphValidationReport {
644 RegistryGraphValidationReport {
645 entries: self.accepted.into_values().collect(),
646 }
647 }
648}