Skip to main content

mqdb_core/
schema.rs

1// Copyright 2025-2026 LabOverWire. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::error::{Error, Result};
5use crate::keys;
6use crate::storage::{BatchWriter, Storage};
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use std::collections::HashMap;
10
11#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
12pub enum FieldType {
13    #[serde(alias = "string")]
14    String,
15    #[serde(alias = "number")]
16    Number,
17    #[serde(alias = "boolean")]
18    Boolean,
19    #[serde(alias = "array")]
20    Array,
21    #[serde(alias = "object")]
22    Object,
23    #[serde(alias = "null")]
24    Null,
25}
26
27#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
28pub struct FieldDefinition {
29    #[serde(default)]
30    pub name: String,
31    #[serde(alias = "type")]
32    pub field_type: FieldType,
33    #[serde(default)]
34    pub required: bool,
35    pub default: Option<Value>,
36}
37
38impl FieldDefinition {
39    pub fn new(name: impl Into<String>, field_type: FieldType) -> Self {
40        Self {
41            name: name.into(),
42            field_type,
43            required: false,
44            default: None,
45        }
46    }
47
48    #[must_use]
49    pub fn required(mut self) -> Self {
50        self.required = true;
51        self
52    }
53
54    #[must_use]
55    pub fn with_default(mut self, value: Value) -> Self {
56        self.default = Some(value);
57        self
58    }
59
60    fn validate_value(&self, value: &Value) -> bool {
61        matches!(
62            (&self.field_type, value),
63            (FieldType::String, Value::String(_))
64                | (FieldType::Number, Value::Number(_))
65                | (FieldType::Boolean, Value::Bool(_))
66                | (FieldType::Array, Value::Array(_))
67                | (FieldType::Object, Value::Object(_))
68                | (FieldType::Null, Value::Null)
69        )
70    }
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct Schema {
75    pub entity: String,
76    pub fields: HashMap<String, FieldDefinition>,
77    #[serde(default = "default_schema_version")]
78    pub version: u64,
79}
80
81fn default_schema_version() -> u64 {
82    1
83}
84
85impl Schema {
86    pub fn new(entity: impl Into<String>) -> Self {
87        Self {
88            entity: entity.into(),
89            fields: HashMap::new(),
90            version: 1,
91        }
92    }
93
94    #[must_use]
95    pub fn with_fields(
96        entity: impl Into<String>,
97        fields: HashMap<String, FieldDefinition>,
98    ) -> Self {
99        Self {
100            entity: entity.into(),
101            fields,
102            version: 1,
103        }
104    }
105
106    #[must_use]
107    pub fn add_field(mut self, field: FieldDefinition) -> Self {
108        self.fields.insert(field.name.clone(), field);
109        self
110    }
111
112    /// # Errors
113    /// Returns an error if the data does not conform to the schema.
114    pub fn validate(&self, data: &Value) -> Result<()> {
115        let obj = data.as_object().ok_or_else(|| Error::SchemaViolation {
116            entity: self.entity.clone(),
117            field: "<root>".to_string(),
118            reason: "entity data must be an object".to_string(),
119        })?;
120
121        for (field_name, field_def) in &self.fields {
122            match obj.get(field_name) {
123                Some(value) => {
124                    if !field_def.validate_value(value) {
125                        return Err(Error::SchemaViolation {
126                            entity: self.entity.clone(),
127                            field: field_name.clone(),
128                            reason: format!(
129                                "expected type {:?}, got {}",
130                                field_def.field_type,
131                                match value {
132                                    Value::String(_) => "string",
133                                    Value::Number(_) => "number",
134                                    Value::Bool(_) => "boolean",
135                                    Value::Array(_) => "array",
136                                    Value::Object(_) => "object",
137                                    Value::Null => "null",
138                                }
139                            ),
140                        });
141                    }
142                }
143                None => {
144                    if field_def.required && field_def.default.is_none() {
145                        return Err(Error::SchemaViolation {
146                            entity: self.entity.clone(),
147                            field: field_name.clone(),
148                            reason: "required field is missing".to_string(),
149                        });
150                    }
151                }
152            }
153        }
154
155        Ok(())
156    }
157
158    /// # Errors
159    /// Returns an error if the data is not an object.
160    pub fn apply_defaults(&self, data: &mut Value) -> Result<()> {
161        let obj = data.as_object_mut().ok_or_else(|| Error::SchemaViolation {
162            entity: self.entity.clone(),
163            field: "<root>".to_string(),
164            reason: "entity data must be an object".to_string(),
165        })?;
166
167        for (field_name, field_def) in &self.fields {
168            if !obj.contains_key(field_name)
169                && let Some(default) = &field_def.default
170            {
171                obj.insert(field_name.clone(), default.clone());
172            }
173        }
174
175        Ok(())
176    }
177}
178
179pub struct SchemaRegistry {
180    schemas: HashMap<String, Schema>,
181}
182
183impl SchemaRegistry {
184    #[allow(clippy::must_use_candidate)]
185    pub fn new() -> Self {
186        Self {
187            schemas: HashMap::new(),
188        }
189    }
190
191    pub fn add_schema(&mut self, mut schema: Schema) {
192        if let Some(existing) = self.schemas.get(&schema.entity) {
193            if existing.fields == schema.fields {
194                schema.version = existing.version;
195            } else {
196                schema.version = existing.version + 1;
197            }
198        }
199        self.schemas.insert(schema.entity.clone(), schema);
200    }
201
202    #[must_use]
203    pub fn get_schema(&self, entity: &str) -> Option<&Schema> {
204        self.schemas.get(entity)
205    }
206
207    /// # Errors
208    /// Returns an error if the entity has a schema and validation fails.
209    pub fn validate_entity(&self, entity_name: &str, data: &Value) -> Result<()> {
210        if let Some(schema) = self.schemas.get(entity_name) {
211            schema.validate(data)?;
212        }
213        Ok(())
214    }
215
216    /// # Errors
217    /// Returns an error if the entity has a schema and applying defaults fails.
218    pub fn apply_defaults(&self, entity_name: &str, data: &mut Value) -> Result<()> {
219        if let Some(schema) = self.schemas.get(entity_name) {
220            schema.apply_defaults(data)?;
221        }
222        Ok(())
223    }
224
225    /// # Errors
226    /// Returns an error if serialization fails.
227    pub fn persist_schema(&self, batch: &mut BatchWriter, schema: &Schema) -> Result<()> {
228        let key = keys::encode_schema_key(&schema.entity);
229        let value = serde_json::to_vec(schema)?;
230        batch.insert(key, value);
231        Ok(())
232    }
233
234    /// # Errors
235    /// Returns an error if reading or deserializing schemas fails.
236    pub fn load_schemas(&mut self, storage: &Storage) -> Result<()> {
237        let prefix = b"meta/schema/";
238        let items = storage.prefix_scan(prefix)?;
239
240        for (_key, value) in items {
241            let schema: Schema = serde_json::from_slice(&value)?;
242            self.schemas.insert(schema.entity.clone(), schema);
243        }
244
245        Ok(())
246    }
247
248    /// # Errors
249    /// Returns `SchemaViolation` if any field doesn't exist in the entity's schema.
250    pub fn validate_fields_exist(
251        &self,
252        entity_name: &str,
253        fields: &[&str],
254        context: &str,
255    ) -> Result<()> {
256        if let Some(schema) = self.schemas.get(entity_name) {
257            for field in fields {
258                if *field != "id" && !schema.fields.contains_key(*field) {
259                    return Err(Error::SchemaViolation {
260                        entity: entity_name.to_string(),
261                        field: (*field).to_string(),
262                        reason: format!("{context} field does not exist in schema"),
263                    });
264                }
265            }
266        }
267        Ok(())
268    }
269
270    #[must_use]
271    pub fn entity_names(&self) -> Vec<String> {
272        self.schemas.keys().cloned().collect()
273    }
274
275    pub fn remove_schema(&mut self, batch: &mut BatchWriter, entity: &str) {
276        self.schemas.remove(entity);
277        let key = keys::encode_schema_key(entity);
278        batch.remove(key);
279    }
280}
281
282impl Default for SchemaRegistry {
283    fn default() -> Self {
284        Self::new()
285    }
286}
287
288#[cfg(test)]
289mod tests {
290    use super::*;
291    use serde_json::json;
292
293    #[test]
294    fn test_field_definition() {
295        let field = FieldDefinition::new("name", FieldType::String)
296            .required()
297            .with_default(json!("anonymous"));
298
299        assert_eq!(field.name, "name");
300        assert_eq!(field.field_type, FieldType::String);
301        assert!(field.required);
302        assert_eq!(field.default, Some(json!("anonymous")));
303    }
304
305    #[test]
306    fn test_schema_validation_valid() {
307        let schema = Schema::new("users")
308            .add_field(FieldDefinition::new("name", FieldType::String).required())
309            .add_field(FieldDefinition::new("age", FieldType::Number));
310
311        let data = json!({
312            "name": "Alice",
313            "age": 30
314        });
315
316        assert!(schema.validate(&data).is_ok());
317    }
318
319    #[test]
320    fn test_schema_validation_missing_required() {
321        let schema = Schema::new("users")
322            .add_field(FieldDefinition::new("name", FieldType::String).required());
323
324        let data = json!({
325            "age": 30
326        });
327
328        let result = schema.validate(&data);
329        assert!(result.is_err());
330        assert!(matches!(result, Err(Error::SchemaViolation { .. })));
331    }
332
333    #[test]
334    fn test_schema_validation_wrong_type() {
335        let schema = Schema::new("users").add_field(FieldDefinition::new("age", FieldType::Number));
336
337        let data = json!({
338            "age": "thirty"
339        });
340
341        let result = schema.validate(&data);
342        assert!(result.is_err());
343        assert!(matches!(result, Err(Error::SchemaViolation { .. })));
344    }
345
346    #[test]
347    fn test_schema_apply_defaults() {
348        let schema = Schema::new("users")
349            .add_field(FieldDefinition::new("name", FieldType::String))
350            .add_field(
351                FieldDefinition::new("status", FieldType::String).with_default(json!("active")),
352            );
353
354        let mut data = json!({
355            "name": "Alice"
356        });
357
358        schema.apply_defaults(&mut data).unwrap();
359        assert_eq!(data["status"], "active");
360    }
361
362    #[test]
363    fn test_schema_registry() {
364        let mut registry = SchemaRegistry::new();
365
366        let schema = Schema::new("users")
367            .add_field(FieldDefinition::new("name", FieldType::String).required());
368
369        registry.add_schema(schema);
370
371        let valid = json!({"name": "Alice"});
372        assert!(registry.validate_entity("users", &valid).is_ok());
373
374        let invalid = json!({"age": 30});
375        assert!(registry.validate_entity("users", &invalid).is_err());
376    }
377
378    #[test]
379    fn test_schema_version_stable_on_identical_fields() {
380        let mut registry = SchemaRegistry::new();
381
382        let schema = Schema::new("users")
383            .add_field(FieldDefinition::new("name", FieldType::String).required())
384            .add_field(FieldDefinition::new("age", FieldType::Number));
385        registry.add_schema(schema);
386        assert_eq!(registry.get_schema("users").unwrap().version, 1);
387
388        let same_schema = Schema::new("users")
389            .add_field(FieldDefinition::new("name", FieldType::String).required())
390            .add_field(FieldDefinition::new("age", FieldType::Number));
391        registry.add_schema(same_schema);
392        assert_eq!(registry.get_schema("users").unwrap().version, 1);
393
394        let different_schema = Schema::new("users")
395            .add_field(FieldDefinition::new("name", FieldType::String).required())
396            .add_field(FieldDefinition::new("email", FieldType::String));
397        registry.add_schema(different_schema);
398        assert_eq!(registry.get_schema("users").unwrap().version, 2);
399    }
400}