Skip to main content

liminal/channel/
schema.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use jsonschema::Validator;
5use serde_json::{Map, Value};
6use uuid::Uuid;
7
8/// Unique identifier for a schema version that validated a message.
9#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
10pub struct SchemaId(Uuid);
11
12impl SchemaId {
13    /// Generates a new schema identifier.
14    #[must_use]
15    pub fn new() -> Self {
16        Self(Uuid::new_v4())
17    }
18
19    /// Wraps an existing UUID as a schema identifier.
20    #[must_use]
21    pub const fn from_uuid(uuid: Uuid) -> Self {
22        Self(uuid)
23    }
24
25    /// Returns the underlying UUID.
26    #[must_use]
27    pub const fn as_uuid(self) -> Uuid {
28        self.0
29    }
30}
31
32impl Default for SchemaId {
33    fn default() -> Self {
34        Self::new()
35    }
36}
37
38/// JSON Schema-backed message contract for a channel.
39#[derive(Clone, Debug)]
40pub struct Schema {
41    id: SchemaId,
42    definition: Arc<Value>,
43    validator: Arc<Validator>,
44    defaults: Arc<BTreeMap<String, Value>>,
45}
46
47impl Schema {
48    /// Builds a schema from a JSON Schema definition.
49    ///
50    /// # Errors
51    ///
52    /// Returns [`SchemaValidationError::InvalidSchema`] when the definition cannot be compiled.
53    pub fn new(definition: Value) -> Result<Self, SchemaValidationError> {
54        Self::with_id(SchemaId::new(), definition)
55    }
56
57    /// Builds a schema from a JSON Schema definition with an explicit identifier.
58    ///
59    /// # Errors
60    ///
61    /// Returns [`SchemaValidationError::InvalidSchema`] when the definition cannot be compiled.
62    pub fn with_id(id: SchemaId, definition: Value) -> Result<Self, SchemaValidationError> {
63        let defaults = collect_object_defaults(&definition)?;
64        Self::from_parts(id, definition, defaults)
65    }
66
67    /// Returns the schema version identifier.
68    #[must_use]
69    pub const fn id(&self) -> SchemaId {
70        self.id
71    }
72
73    /// Returns the wrapped JSON Schema definition.
74    #[must_use]
75    pub fn definition(&self) -> &Value {
76        &self.definition
77    }
78
79    /// Validates JSON payload bytes after applying known schema defaults.
80    ///
81    /// # Errors
82    ///
83    /// Returns [`SchemaValidationError`] when the payload is not JSON, cannot receive required
84    /// defaults, or does not match this schema.
85    pub fn validate<Payload>(&self, payload: Payload) -> Result<(), SchemaValidationError>
86    where
87        Payload: AsRef<[u8]>,
88    {
89        let value = self.normalized_value(payload)?;
90        self.validate_value(&value)
91    }
92
93    /// Validates payload bytes and returns the normalized JSON bytes to deliver.
94    ///
95    /// # Errors
96    ///
97    /// Returns [`SchemaValidationError`] when the payload cannot be parsed, normalized, validated,
98    /// or serialized for delivery.
99    pub fn validate_and_apply_defaults<Payload>(
100        &self,
101        payload: Payload,
102    ) -> Result<Vec<u8>, SchemaValidationError>
103    where
104        Payload: AsRef<[u8]>,
105    {
106        let value = self.normalized_value(payload)?;
107        self.validate_value(&value)?;
108        serde_json::to_vec(&value).map_err(|source| SchemaValidationError::Serialize { source })
109    }
110
111    /// Evolves an object schema by adding a required field with a default value.
112    ///
113    /// # Errors
114    ///
115    /// Returns [`SchemaValidationError`] when this is not an object schema, the field schema is
116    /// invalid, the default does not match the field schema, or the evolved schema cannot compile.
117    pub fn evolve_add_field(
118        &self,
119        name: impl Into<String>,
120        field_schema: Value,
121        default: Value,
122    ) -> Result<Self, SchemaValidationError> {
123        let name = name.into();
124        if name.is_empty() {
125            return Err(SchemaValidationError::EmptyFieldName);
126        }
127        validate_default(&field_schema, &default)?;
128
129        let mut definition = (*self.definition).clone();
130        if !is_object_schema(&definition) {
131            return Err(SchemaValidationError::NotObjectSchema);
132        }
133
134        let document = definition
135            .as_object_mut()
136            .ok_or(SchemaValidationError::NotObjectSchema)?;
137        insert_property_schema(document, &name, field_schema, &default)?;
138        insert_required_field(document, &name)?;
139
140        let mut defaults = (*self.defaults).clone();
141        defaults.insert(name, default);
142        Self::from_parts(SchemaId::new(), definition, defaults)
143    }
144
145    fn from_parts(
146        id: SchemaId,
147        definition: Value,
148        defaults: BTreeMap<String, Value>,
149    ) -> Result<Self, SchemaValidationError> {
150        let validator = jsonschema::validator_for(&definition).map_err(|error| {
151            SchemaValidationError::InvalidSchema {
152                message: error.to_string(),
153            }
154        })?;
155
156        Ok(Self {
157            id,
158            definition: Arc::new(definition),
159            validator: Arc::new(validator),
160            defaults: Arc::new(defaults),
161        })
162    }
163
164    fn normalized_value<Payload>(&self, payload: Payload) -> Result<Value, SchemaValidationError>
165    where
166        Payload: AsRef<[u8]>,
167    {
168        let mut value = serde_json::from_slice(payload.as_ref())
169            .map_err(|source| SchemaValidationError::InvalidJson { source })?;
170        self.apply_defaults(&mut value)?;
171        Ok(value)
172    }
173
174    fn apply_defaults(&self, value: &mut Value) -> Result<(), SchemaValidationError> {
175        if self.defaults.is_empty() {
176            return Ok(());
177        }
178
179        let object = value
180            .as_object_mut()
181            .ok_or(SchemaValidationError::PayloadNotObject)?;
182        for (field, default) in self.defaults.iter() {
183            object
184                .entry(field.clone())
185                .or_insert_with(|| default.clone());
186        }
187        Ok(())
188    }
189
190    fn validate_value(&self, value: &Value) -> Result<(), SchemaValidationError> {
191        self.validator
192            .validate(value)
193            .map_err(|error| SchemaValidationError::Mismatch {
194                message: error.to_string(),
195            })
196    }
197}
198
199/// Errors returned while compiling schemas, validating payloads, or evolving schemas.
200#[derive(Debug, thiserror::Error)]
201pub enum SchemaValidationError {
202    /// The payload was not syntactically valid JSON.
203    #[error("invalid JSON payload: {source}")]
204    InvalidJson { source: serde_json::Error },
205    /// The JSON Schema definition could not be compiled.
206    #[error("invalid JSON Schema: {message}")]
207    InvalidSchema { message: String },
208    /// The payload did not satisfy the JSON Schema definition.
209    #[error("payload does not match schema: {message}")]
210    Mismatch { message: String },
211    /// Schema evolution can only add fields to object schemas.
212    #[error("schema evolution only supports object schemas")]
213    NotObjectSchema,
214    /// Schema evolution field names must be non-empty.
215    #[error("schema evolution field name must not be empty")]
216    EmptyFieldName,
217    /// The schema's properties member was not an object.
218    #[error("object schema properties must be an object")]
219    InvalidProperties,
220    /// The schema's required member was not an array of strings.
221    #[error("object schema required must be an array of strings")]
222    InvalidRequired,
223    /// The payload cannot receive object-field defaults because it is not an object.
224    #[error("payload must be a JSON object to apply schema defaults")]
225    PayloadNotObject,
226    /// The normalized JSON payload could not be serialized for delivery.
227    #[error("failed to serialize normalized payload: {source}")]
228    Serialize { source: serde_json::Error },
229}
230
231fn collect_object_defaults(
232    definition: &Value,
233) -> Result<BTreeMap<String, Value>, SchemaValidationError> {
234    let Some(document) = definition.as_object() else {
235        return Ok(BTreeMap::new());
236    };
237    let Some(properties) = document.get("properties") else {
238        return Ok(BTreeMap::new());
239    };
240    let properties = properties
241        .as_object()
242        .ok_or(SchemaValidationError::InvalidProperties)?;
243
244    let defaults = properties
245        .iter()
246        .filter_map(|(field, schema)| {
247            schema
248                .as_object()
249                .and_then(|field_schema| field_schema.get("default"))
250                .map(|default| (field.clone(), default.clone()))
251        })
252        .collect();
253    Ok(defaults)
254}
255
256fn validate_default(field_schema: &Value, default: &Value) -> Result<(), SchemaValidationError> {
257    let validator = jsonschema::validator_for(field_schema).map_err(|error| {
258        SchemaValidationError::InvalidSchema {
259            message: error.to_string(),
260        }
261    })?;
262    validator
263        .validate(default)
264        .map_err(|error| SchemaValidationError::Mismatch {
265            message: format!("default value does not match field schema: {error}"),
266        })
267}
268
269fn is_object_schema(definition: &Value) -> bool {
270    let Some(document) = definition.as_object() else {
271        return false;
272    };
273
274    match document.get("type") {
275        Some(Value::String(schema_type)) => schema_type == "object",
276        Some(Value::Array(schema_types)) => schema_types
277            .iter()
278            .any(|schema_type| schema_type.as_str() == Some("object")),
279        Some(_) => false,
280        None => document.contains_key("properties"),
281    }
282}
283
284fn insert_property_schema(
285    document: &mut Map<String, Value>,
286    name: &str,
287    mut field_schema: Value,
288    default: &Value,
289) -> Result<(), SchemaValidationError> {
290    let field_document =
291        field_schema
292            .as_object_mut()
293            .ok_or_else(|| SchemaValidationError::InvalidSchema {
294                message: "field schema must be a JSON Schema object".to_owned(),
295            })?;
296    field_document.insert("default".to_owned(), default.clone());
297
298    let properties = document
299        .entry("properties".to_owned())
300        .or_insert_with(|| Value::Object(Map::new()));
301    let properties = properties
302        .as_object_mut()
303        .ok_or(SchemaValidationError::InvalidProperties)?;
304    properties.insert(name.to_owned(), field_schema);
305    Ok(())
306}
307
308fn insert_required_field(
309    document: &mut Map<String, Value>,
310    name: &str,
311) -> Result<(), SchemaValidationError> {
312    let required = document
313        .entry("required".to_owned())
314        .or_insert_with(|| Value::Array(Vec::new()));
315    let required = required
316        .as_array_mut()
317        .ok_or(SchemaValidationError::InvalidRequired)?;
318
319    if required.iter().any(|item| item.as_str().is_none()) {
320        return Err(SchemaValidationError::InvalidRequired);
321    }
322    if !required.iter().any(|item| item.as_str() == Some(name)) {
323        required.push(Value::String(name.to_owned()));
324    }
325    Ok(())
326}
327
328#[cfg(test)]
329mod tests {
330    use super::{Schema, SchemaId, SchemaValidationError};
331    use serde_json::{Value, json};
332
333    #[test]
334    fn schema_is_clone_send_sync() {
335        fn assert_bounds<T: Clone + Send + Sync + std::fmt::Debug>() {}
336
337        assert_bounds::<Schema>();
338    }
339
340    #[test]
341    fn validates_payload_against_json_schema() -> Result<(), SchemaValidationError> {
342        let schema = order_schema()?;
343
344        schema.validate(br#"{"order_id":"A1","quantity":3}"#)?;
345        let result = schema.validate(br#"{"order_id":"A1","quantity":0}"#);
346
347        assert!(matches!(
348            result,
349            Err(SchemaValidationError::Mismatch { .. })
350        ));
351        Ok(())
352    }
353
354    #[test]
355    fn evolution_adds_defaulted_field_and_changes_schema_id() -> Result<(), SchemaValidationError> {
356        let schema = order_schema()?;
357        let old_id = schema.id();
358        let evolved =
359            schema.evolve_add_field("priority", json!({"type":"string"}), json!("normal"))?;
360        let normalized =
361            evolved.validate_and_apply_defaults(br#"{"order_id":"A1","quantity":3}"#)?;
362        let payload: Value = serde_json::from_slice(&normalized)
363            .map_err(|source| SchemaValidationError::InvalidJson { source })?;
364
365        assert_ne!(evolved.id(), old_id);
366        assert_eq!(payload.get("priority"), Some(&json!("normal")));
367        Ok(())
368    }
369
370    #[test]
371    fn evolution_rejects_non_object_schema() -> Result<(), SchemaValidationError> {
372        let schema = Schema::new(json!({"type":"array"}))?;
373        let result = schema.evolve_add_field("priority", json!({"type":"string"}), json!("normal"));
374
375        assert!(matches!(
376            result,
377            Err(SchemaValidationError::NotObjectSchema)
378        ));
379        Ok(())
380    }
381
382    #[test]
383    fn explicit_schema_id_is_preserved() -> Result<(), SchemaValidationError> {
384        let id = SchemaId::new();
385        let schema = Schema::with_id(id, json!({"type":"object"}))?;
386
387        assert_eq!(schema.id(), id);
388        Ok(())
389    }
390
391    fn order_schema() -> Result<Schema, SchemaValidationError> {
392        Schema::new(json!({
393            "type": "object",
394            "properties": {
395                "order_id": {"type": "string"},
396                "quantity": {"type": "integer", "minimum": 1}
397            },
398            "required": ["order_id", "quantity"],
399            "additionalProperties": false
400        }))
401    }
402}