Skip to main content

vela_protocol/
schema_registry.rs

1//! Content-addressed schema/reducer artifacts.
2//!
3//! Implements the schema-artifact part of `docs/THEORY.md`
4//! Section 5.1, where event tuples reference a content-addressed
5//! `schema` field that pins the replay semantics:
6//!
7//! > schema = content-addressed schema and reducer reference
8//!
9//! And §5.5:
10//!
11//! > Schema and reducer artifacts are fixed by content hash.
12//!
13//! ## What this module ships
14//!
15//! - [`SchemaArtifact`]: a typed, content-addressed artifact whose
16//!   id is the SHA-256 of its canonical content.
17//! - [`SchemaRegistry`]: a registry mapping artifact id to artifact,
18//!   used to verify that an event references a known schema before
19//!   replay.
20//! - Verification primitives that future event-replay code can call
21//!   to check schema availability and detect schema drift.
22//!
23//! ## What this module does NOT do
24//!
25//! It does not yet replace the existing `StateEvent::schema: String`
26//! version-tag field. That replacement is a wider substrate change
27//! (target v0.85+) that ripples into canonicalization,
28//! event-id derivation, and existing event-set hashes. This module
29//! ships the artifact + registry primitive on which that
30//! replacement will sit.
31
32use std::collections::BTreeMap;
33
34use serde::{Deserialize, Serialize};
35use sha2::{Digest, Sha256};
36
37/// A content-addressed schema or reducer artifact.
38///
39/// The `id` is derived from the canonical serialization of
40/// `(name, version, body)`. Equal artifacts have equal ids;
41/// different artifacts have ids that differ except with negligible
42/// probability under SHA-256.
43#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
44pub struct SchemaArtifact {
45    /// Content-addressed id, prefixed with `vsa_` (Vela Schema
46    /// Artifact). This is `H(canonical(name, version, body))`.
47    pub id: String,
48    /// Human-readable name (e.g. `vela.event.finding_asserted`).
49    pub name: String,
50    /// Semver-style version string (e.g. `v0.1`).
51    pub version: String,
52    /// Body of the artifact: the actual schema or reducer
53    /// specification, kept as a JSON value to avoid committing to
54    /// any one schema language at the substrate layer.
55    pub body: serde_json::Value,
56}
57
58impl SchemaArtifact {
59    /// Build a new artifact, computing the content-addressed id
60    /// from the canonical serialization of `(name, version, body)`.
61    pub fn new(
62        name: impl Into<String>,
63        version: impl Into<String>,
64        body: serde_json::Value,
65    ) -> Result<Self, String> {
66        let name = name.into();
67        let version = version.into();
68        let id = Self::derive_id(&name, &version, &body)?;
69        Ok(Self {
70            id,
71            name,
72            version,
73            body,
74        })
75    }
76
77    /// Derive the content-addressed id without constructing an
78    /// artifact. Useful for verifying that a stored artifact's id
79    /// matches its content.
80    pub fn derive_id(
81        name: &str,
82        version: &str,
83        body: &serde_json::Value,
84    ) -> Result<String, String> {
85        // Canonical form: a JSON object with sorted keys
86        // {body, name, version}. We use BTreeMap to enforce key
87        // ordering. The body is a JSON value already, so we
88        // canonicalize it via serde_json with sorted keys.
89        let canonical = canonical_json(&serde_json::json!({
90            "body": body,
91            "name": name,
92            "version": version,
93        }))?;
94        let mut hasher = Sha256::new();
95        hasher.update(canonical.as_bytes());
96        let hash = hasher.finalize();
97        Ok(format!("vsa_{}", hex::encode(&hash[..16])))
98    }
99
100    /// Verify that this artifact's stored id matches the id derived
101    /// from its content.
102    pub fn verify_id(&self) -> Result<(), String> {
103        let derived = Self::derive_id(&self.name, &self.version, &self.body)?;
104        if derived == self.id {
105            Ok(())
106        } else {
107            Err(format!(
108                "schema artifact id mismatch: stored={}, derived={}",
109                self.id, derived
110            ))
111        }
112    }
113}
114
115/// Canonicalize a JSON value: sort all object keys recursively and
116/// serialize without whitespace. This is the canonicalization
117/// scheme used for content-addressing schema artifacts.
118fn canonical_json(value: &serde_json::Value) -> Result<String, String> {
119    fn canon(v: &serde_json::Value) -> serde_json::Value {
120        match v {
121            serde_json::Value::Object(map) => {
122                let mut sorted: BTreeMap<String, serde_json::Value> = BTreeMap::new();
123                for (k, vv) in map {
124                    sorted.insert(k.clone(), canon(vv));
125                }
126                let mut out = serde_json::Map::new();
127                for (k, vv) in sorted {
128                    out.insert(k, vv);
129                }
130                serde_json::Value::Object(out)
131            }
132            serde_json::Value::Array(items) => {
133                serde_json::Value::Array(items.iter().map(canon).collect())
134            }
135            other => other.clone(),
136        }
137    }
138    serde_json::to_string(&canon(value)).map_err(|e| format!("canonicalize: {e}"))
139}
140
141/// A registry of known schema artifacts.
142///
143/// Replay code uses the registry to verify that every event's
144/// schema reference points to an available artifact. Missing
145/// artifacts mean the event cannot be replayed deterministically;
146/// federation policy must fetch missing artifacts before replay
147/// proceeds (analogous to the missing-ancestor case in §5.2).
148#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
149pub struct SchemaRegistry {
150    artifacts: BTreeMap<String, SchemaArtifact>,
151}
152
153impl SchemaRegistry {
154    /// Empty registry.
155    #[must_use]
156    pub fn empty() -> Self {
157        Self::default()
158    }
159
160    /// Insert an artifact, verifying its id matches its content.
161    ///
162    /// Returns an error if the artifact's stored id does not match
163    /// its derived id. This catches tampering at registration time
164    /// rather than at replay time.
165    pub fn insert(&mut self, artifact: SchemaArtifact) -> Result<(), String> {
166        artifact.verify_id()?;
167        self.artifacts.insert(artifact.id.clone(), artifact);
168        Ok(())
169    }
170
171    /// Look up an artifact by id.
172    pub fn get(&self, id: &str) -> Option<&SchemaArtifact> {
173        self.artifacts.get(id)
174    }
175
176    /// Whether the registry contains an artifact with the given id.
177    #[must_use]
178    pub fn contains(&self, id: &str) -> bool {
179        self.artifacts.contains_key(id)
180    }
181
182    /// Number of artifacts in the registry.
183    #[must_use]
184    pub fn len(&self) -> usize {
185        self.artifacts.len()
186    }
187
188    /// Whether the registry is empty.
189    #[must_use]
190    pub fn is_empty(&self) -> bool {
191        self.artifacts.is_empty()
192    }
193
194    /// All artifact ids in canonical (sorted) order.
195    pub fn ids(&self) -> impl Iterator<Item = &str> {
196        self.artifacts.keys().map(String::as_str)
197    }
198
199    /// Detect schema artifacts referenced by `referenced` but
200    /// missing from the registry. Used at replay-time to determine
201    /// whether replay can proceed.
202    pub fn missing<I: AsRef<str>>(&self, referenced: &[I]) -> Vec<String> {
203        referenced
204            .iter()
205            .filter(|id| !self.artifacts.contains_key(id.as_ref()))
206            .map(|id| id.as_ref().to_string())
207            .collect()
208    }
209
210    /// Inspect a slice of events and return any
211    /// `schema_artifact_id` values that are not present in the
212    /// registry. Events with `schema_artifact_id == None` are
213    /// skipped (they predate the artifact-registry mechanism per
214    /// docs/THEORY.md §5.1 and use the legacy string `schema`
215    /// field).
216    ///
217    /// Returned ids are deduplicated and sorted lexically.
218    pub fn unknown_event_artifacts(&self, events: &[crate::events::StateEvent]) -> Vec<String> {
219        let mut seen = std::collections::BTreeSet::new();
220        let mut missing = std::collections::BTreeSet::new();
221        for ev in events {
222            let Some(id) = ev.schema_artifact_id.as_deref() else {
223                continue;
224            };
225            if seen.contains(id) {
226                continue;
227            }
228            seen.insert(id.to_string());
229            if !self.artifacts.contains_key(id) {
230                missing.insert(id.to_string());
231            }
232        }
233        missing.into_iter().collect()
234    }
235}
236
237#[cfg(test)]
238mod tests {
239    use super::*;
240    use serde_json::json;
241
242    fn sample_body() -> serde_json::Value {
243        json!({
244            "type": "object",
245            "required": ["finding_id", "actor"],
246            "properties": {
247                "finding_id": {"type": "string"},
248                "actor":      {"type": "string"},
249            }
250        })
251    }
252
253    #[test]
254    fn artifact_id_is_content_addressed() {
255        let a = SchemaArtifact::new("event.finding_asserted", "v0.1", sample_body()).unwrap();
256        let b = SchemaArtifact::new("event.finding_asserted", "v0.1", sample_body()).unwrap();
257        assert_eq!(a.id, b.id);
258        assert!(a.id.starts_with("vsa_"));
259    }
260
261    #[test]
262    fn different_content_yields_different_ids() {
263        let a = SchemaArtifact::new("event.finding_asserted", "v0.1", sample_body()).unwrap();
264        let b = SchemaArtifact::new("event.finding_asserted", "v0.2", sample_body()).unwrap();
265        assert_ne!(a.id, b.id);
266
267        let mut other_body = sample_body();
268        other_body["properties"]["new_field"] = json!({"type": "string"});
269        let c = SchemaArtifact::new("event.finding_asserted", "v0.1", other_body).unwrap();
270        assert_ne!(a.id, c.id);
271    }
272
273    #[test]
274    fn verify_id_rejects_tampered_artifact() {
275        let mut a = SchemaArtifact::new("event.x", "v0.1", json!({"k": "v"})).unwrap();
276        assert!(a.verify_id().is_ok());
277        // Tamper with the body but keep the id.
278        a.body = json!({"k": "v2"});
279        assert!(a.verify_id().is_err());
280    }
281
282    #[test]
283    fn canonical_json_sorts_keys_recursively() {
284        let unsorted = json!({"b": 1, "a": {"d": 4, "c": 3}});
285        let sorted = json!({"a": {"c": 3, "d": 4}, "b": 1});
286        assert_eq!(
287            canonical_json(&unsorted).unwrap(),
288            canonical_json(&sorted).unwrap()
289        );
290    }
291
292    #[test]
293    fn key_order_does_not_affect_id() {
294        let body1 = json!({"a": 1, "b": 2});
295        let body2 = json!({"b": 2, "a": 1});
296        let a = SchemaArtifact::new("x", "v0.1", body1).unwrap();
297        let b = SchemaArtifact::new("x", "v0.1", body2).unwrap();
298        assert_eq!(a.id, b.id);
299    }
300
301    #[test]
302    fn registry_insert_and_lookup() {
303        let mut reg = SchemaRegistry::empty();
304        let a = SchemaArtifact::new("event.x", "v0.1", sample_body()).unwrap();
305        let id = a.id.clone();
306        reg.insert(a).unwrap();
307        assert!(reg.contains(&id));
308        assert!(reg.get(&id).is_some());
309        assert_eq!(reg.len(), 1);
310    }
311
312    #[test]
313    fn registry_rejects_tampered_artifact_at_insert() {
314        let mut reg = SchemaRegistry::empty();
315        let mut a = SchemaArtifact::new("event.x", "v0.1", sample_body()).unwrap();
316        // Tamper with body without updating id.
317        a.body = json!({"different": "content"});
318        let result = reg.insert(a);
319        assert!(result.is_err());
320        assert!(reg.is_empty());
321    }
322
323    #[test]
324    fn missing_returns_unregistered_ids() {
325        let mut reg = SchemaRegistry::empty();
326        let a = SchemaArtifact::new("event.x", "v0.1", sample_body()).unwrap();
327        let known = a.id.clone();
328        reg.insert(a).unwrap();
329
330        let referenced = vec![
331            known.clone(),
332            "vsa_unknown1".to_string(),
333            "vsa_unknown2".to_string(),
334        ];
335        let missing = reg.missing(&referenced);
336        assert_eq!(missing, vec!["vsa_unknown1", "vsa_unknown2"]);
337    }
338
339    #[test]
340    fn missing_returns_empty_when_all_present() {
341        let mut reg = SchemaRegistry::empty();
342        let a = SchemaArtifact::new("event.x", "v0.1", sample_body()).unwrap();
343        let id = a.id.clone();
344        reg.insert(a).unwrap();
345        assert!(reg.missing(&[id]).is_empty());
346    }
347
348    #[test]
349    fn registry_serde_round_trip() {
350        let mut reg = SchemaRegistry::empty();
351        let a = SchemaArtifact::new("event.x", "v0.1", sample_body()).unwrap();
352        let b = SchemaArtifact::new("event.y", "v0.2", json!({"different": true})).unwrap();
353        reg.insert(a).unwrap();
354        reg.insert(b).unwrap();
355
356        let json = serde_json::to_string(&reg).unwrap();
357        let restored: SchemaRegistry = serde_json::from_str(&json).unwrap();
358        assert_eq!(restored, reg);
359    }
360
361    #[test]
362    fn id_uses_vsa_prefix_and_hex() {
363        let a = SchemaArtifact::new("x", "v0.1", json!({})).unwrap();
364        assert!(a.id.starts_with("vsa_"));
365        let hex_part = &a.id[4..];
366        assert_eq!(hex_part.len(), 32); // 16 bytes * 2 hex chars
367        assert!(hex_part.chars().all(|c| c.is_ascii_hexdigit()));
368    }
369
370    #[test]
371    fn ids_are_returned_in_canonical_order() {
372        let mut reg = SchemaRegistry::empty();
373        // Insert in non-canonical order
374        for n in ["zeta", "alpha", "beta"] {
375            let a = SchemaArtifact::new(n, "v0.1", json!({"n": n})).unwrap();
376            reg.insert(a).unwrap();
377        }
378        // Ids are returned sorted, regardless of insertion order
379        let ids: Vec<&str> = reg.ids().collect();
380        let mut sorted = ids.clone();
381        sorted.sort();
382        assert_eq!(ids, sorted);
383    }
384
385    fn sample_event(id_seed: &str, artifact: Option<&str>) -> crate::events::StateEvent {
386        use crate::events::{StateActor, StateEvent, StateTarget};
387        StateEvent {
388            schema: "vela.event.v0.1".into(),
389            id: format!("vev_{}", id_seed),
390            kind: "test.event".into(),
391            target: StateTarget {
392                r#type: "finding".into(),
393                id: "vf_x".into(),
394            },
395            actor: StateActor {
396                id: "test".into(),
397                r#type: "system".into(),
398            },
399            timestamp: "2026-05-09T00:00:00Z".into(),
400            reason: "test".into(),
401            before_hash: String::new(),
402            after_hash: String::new(),
403            payload: json!(null),
404            caveats: vec![],
405            signature: None,
406            schema_artifact_id: artifact.map(String::from),
407        }
408    }
409
410    #[test]
411    fn unknown_event_artifacts_returns_only_missing_referenced_ids() {
412        let mut reg = SchemaRegistry::empty();
413        let known_artifact = SchemaArtifact::new("event.x", "v0.1", json!({})).unwrap();
414        let known_id = known_artifact.id.clone();
415        reg.insert(known_artifact).unwrap();
416
417        let events = vec![
418            // Event referencing a known artifact: not missing.
419            sample_event("001", Some(&known_id)),
420            // Event referencing an unknown artifact: missing.
421            sample_event("002", Some("vsa_unknown")),
422            // Event without an artifact reference: skipped.
423            sample_event("003", None),
424        ];
425        let missing = reg.unknown_event_artifacts(&events);
426        assert_eq!(missing, vec!["vsa_unknown"]);
427    }
428
429    #[test]
430    fn unknown_event_artifacts_deduplicates() {
431        let reg = SchemaRegistry::empty();
432        let events = vec![
433            sample_event("001", Some("vsa_missing")),
434            sample_event("002", Some("vsa_missing")),
435            sample_event("003", Some("vsa_missing")),
436        ];
437        let missing = reg.unknown_event_artifacts(&events);
438        assert_eq!(missing, vec!["vsa_missing"]);
439    }
440
441    #[test]
442    fn schema_artifact_id_does_not_affect_event_id() {
443        // Critical invariant: setting schema_artifact_id must NOT
444        // change event.id. Otherwise existing events would be
445        // forced to migrate, breaking replay determinism on every
446        // historical hub.
447        use crate::events::compute_event_id;
448        let without = sample_event("001", None);
449        let with = sample_event("001", Some("vsa_someartifact"));
450        // Compute fresh event ids from both (sample_event sets
451        // a placeholder; compute_event_id ignores it and rederives).
452        let id_without = compute_event_id(&without);
453        let id_with = compute_event_id(&with);
454        assert_eq!(
455            id_without, id_with,
456            "schema_artifact_id must not be part of canonical event-id preimage"
457        );
458    }
459
460    #[test]
461    fn pre_v0_89_events_serialize_byte_identically() {
462        // An event with schema_artifact_id=None must serialize
463        // without the new field, so pre-v0.89 frontiers round-trip.
464        let event = sample_event("001", None);
465        let json = serde_json::to_string(&event).unwrap();
466        assert!(
467            !json.contains("schema_artifact_id"),
468            "schema_artifact_id should be skipped when None; full json: {json}"
469        );
470    }
471
472    #[test]
473    fn v0_89_event_with_artifact_includes_field() {
474        let event = sample_event("001", Some("vsa_test"));
475        let json = serde_json::to_string(&event).unwrap();
476        assert!(
477            json.contains("schema_artifact_id"),
478            "schema_artifact_id should appear when Some"
479        );
480        assert!(
481            json.contains("vsa_test"),
482            "the artifact id value should appear"
483        );
484    }
485}