1use crate::error::{Error, Result};
5use crate::keys;
6use crate::storage::{BatchWriter, Storage};
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use std::collections::HashMap;
10
11#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
12pub enum FieldType {
13 #[serde(alias = "string")]
14 String,
15 #[serde(alias = "number")]
16 Number,
17 #[serde(alias = "boolean")]
18 Boolean,
19 #[serde(alias = "array")]
20 Array,
21 #[serde(alias = "object")]
22 Object,
23 #[serde(alias = "null")]
24 Null,
25}
26
27#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
28pub struct FieldDefinition {
29 #[serde(default)]
30 pub name: String,
31 #[serde(alias = "type")]
32 pub field_type: FieldType,
33 #[serde(default)]
34 pub required: bool,
35 pub default: Option<Value>,
36}
37
38impl FieldDefinition {
39 pub fn new(name: impl Into<String>, field_type: FieldType) -> Self {
40 Self {
41 name: name.into(),
42 field_type,
43 required: false,
44 default: None,
45 }
46 }
47
48 #[must_use]
49 pub fn required(mut self) -> Self {
50 self.required = true;
51 self
52 }
53
54 #[must_use]
55 pub fn with_default(mut self, value: Value) -> Self {
56 self.default = Some(value);
57 self
58 }
59
60 fn validate_value(&self, value: &Value) -> bool {
61 matches!(
62 (&self.field_type, value),
63 (FieldType::String, Value::String(_))
64 | (FieldType::Number, Value::Number(_))
65 | (FieldType::Boolean, Value::Bool(_))
66 | (FieldType::Array, Value::Array(_))
67 | (FieldType::Object, Value::Object(_))
68 | (FieldType::Null, Value::Null)
69 )
70 }
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct Schema {
75 pub entity: String,
76 pub fields: HashMap<String, FieldDefinition>,
77 #[serde(default = "default_schema_version")]
78 pub version: u64,
79}
80
81fn default_schema_version() -> u64 {
82 1
83}
84
85impl Schema {
86 pub fn new(entity: impl Into<String>) -> Self {
87 Self {
88 entity: entity.into(),
89 fields: HashMap::new(),
90 version: 1,
91 }
92 }
93
94 #[must_use]
95 pub fn with_fields(
96 entity: impl Into<String>,
97 fields: HashMap<String, FieldDefinition>,
98 ) -> Self {
99 Self {
100 entity: entity.into(),
101 fields,
102 version: 1,
103 }
104 }
105
106 #[must_use]
107 pub fn add_field(mut self, field: FieldDefinition) -> Self {
108 self.fields.insert(field.name.clone(), field);
109 self
110 }
111
112 pub fn validate(&self, data: &Value) -> Result<()> {
115 let obj = data.as_object().ok_or_else(|| Error::SchemaViolation {
116 entity: self.entity.clone(),
117 field: "<root>".to_string(),
118 reason: "entity data must be an object".to_string(),
119 })?;
120
121 for (field_name, field_def) in &self.fields {
122 match obj.get(field_name) {
123 Some(value) => {
124 if !field_def.validate_value(value) {
125 return Err(Error::SchemaViolation {
126 entity: self.entity.clone(),
127 field: field_name.clone(),
128 reason: format!(
129 "expected type {:?}, got {}",
130 field_def.field_type,
131 match value {
132 Value::String(_) => "string",
133 Value::Number(_) => "number",
134 Value::Bool(_) => "boolean",
135 Value::Array(_) => "array",
136 Value::Object(_) => "object",
137 Value::Null => "null",
138 }
139 ),
140 });
141 }
142 }
143 None => {
144 if field_def.required && field_def.default.is_none() {
145 return Err(Error::SchemaViolation {
146 entity: self.entity.clone(),
147 field: field_name.clone(),
148 reason: "required field is missing".to_string(),
149 });
150 }
151 }
152 }
153 }
154
155 Ok(())
156 }
157
158 pub fn apply_defaults(&self, data: &mut Value) -> Result<()> {
161 let obj = data.as_object_mut().ok_or_else(|| Error::SchemaViolation {
162 entity: self.entity.clone(),
163 field: "<root>".to_string(),
164 reason: "entity data must be an object".to_string(),
165 })?;
166
167 for (field_name, field_def) in &self.fields {
168 if !obj.contains_key(field_name)
169 && let Some(default) = &field_def.default
170 {
171 obj.insert(field_name.clone(), default.clone());
172 }
173 }
174
175 Ok(())
176 }
177}
178
179pub struct SchemaRegistry {
180 schemas: HashMap<String, Schema>,
181}
182
183impl SchemaRegistry {
184 #[allow(clippy::must_use_candidate)]
185 pub fn new() -> Self {
186 Self {
187 schemas: HashMap::new(),
188 }
189 }
190
191 pub fn add_schema(&mut self, mut schema: Schema) {
192 if let Some(existing) = self.schemas.get(&schema.entity) {
193 if existing.fields == schema.fields {
194 schema.version = existing.version;
195 } else {
196 schema.version = existing.version + 1;
197 }
198 }
199 self.schemas.insert(schema.entity.clone(), schema);
200 }
201
202 #[must_use]
203 pub fn get_schema(&self, entity: &str) -> Option<&Schema> {
204 self.schemas.get(entity)
205 }
206
207 pub fn validate_entity(&self, entity_name: &str, data: &Value) -> Result<()> {
210 if let Some(schema) = self.schemas.get(entity_name) {
211 schema.validate(data)?;
212 }
213 Ok(())
214 }
215
216 pub fn apply_defaults(&self, entity_name: &str, data: &mut Value) -> Result<()> {
219 if let Some(schema) = self.schemas.get(entity_name) {
220 schema.apply_defaults(data)?;
221 }
222 Ok(())
223 }
224
225 pub fn persist_schema(&self, batch: &mut BatchWriter, schema: &Schema) -> Result<()> {
228 let key = keys::encode_schema_key(&schema.entity);
229 let value = serde_json::to_vec(schema)?;
230 batch.insert(key, value);
231 Ok(())
232 }
233
234 pub fn load_schemas(&mut self, storage: &Storage) -> Result<()> {
237 let prefix = b"meta/schema/";
238 let items = storage.prefix_scan(prefix)?;
239
240 for (_key, value) in items {
241 let schema: Schema = serde_json::from_slice(&value)?;
242 self.schemas.insert(schema.entity.clone(), schema);
243 }
244
245 Ok(())
246 }
247
248 pub fn validate_fields_exist(
251 &self,
252 entity_name: &str,
253 fields: &[&str],
254 context: &str,
255 ) -> Result<()> {
256 if let Some(schema) = self.schemas.get(entity_name) {
257 for field in fields {
258 if *field != "id" && !schema.fields.contains_key(*field) {
259 return Err(Error::SchemaViolation {
260 entity: entity_name.to_string(),
261 field: (*field).to_string(),
262 reason: format!("{context} field does not exist in schema"),
263 });
264 }
265 }
266 }
267 Ok(())
268 }
269
270 #[must_use]
271 pub fn entity_names(&self) -> Vec<String> {
272 self.schemas.keys().cloned().collect()
273 }
274
275 pub fn remove_schema(&mut self, batch: &mut BatchWriter, entity: &str) {
276 self.schemas.remove(entity);
277 let key = keys::encode_schema_key(entity);
278 batch.remove(key);
279 }
280}
281
282impl Default for SchemaRegistry {
283 fn default() -> Self {
284 Self::new()
285 }
286}
287
288#[cfg(test)]
289mod tests {
290 use super::*;
291 use serde_json::json;
292
293 #[test]
294 fn test_field_definition() {
295 let field = FieldDefinition::new("name", FieldType::String)
296 .required()
297 .with_default(json!("anonymous"));
298
299 assert_eq!(field.name, "name");
300 assert_eq!(field.field_type, FieldType::String);
301 assert!(field.required);
302 assert_eq!(field.default, Some(json!("anonymous")));
303 }
304
305 #[test]
306 fn test_schema_validation_valid() {
307 let schema = Schema::new("users")
308 .add_field(FieldDefinition::new("name", FieldType::String).required())
309 .add_field(FieldDefinition::new("age", FieldType::Number));
310
311 let data = json!({
312 "name": "Alice",
313 "age": 30
314 });
315
316 assert!(schema.validate(&data).is_ok());
317 }
318
319 #[test]
320 fn test_schema_validation_missing_required() {
321 let schema = Schema::new("users")
322 .add_field(FieldDefinition::new("name", FieldType::String).required());
323
324 let data = json!({
325 "age": 30
326 });
327
328 let result = schema.validate(&data);
329 assert!(result.is_err());
330 assert!(matches!(result, Err(Error::SchemaViolation { .. })));
331 }
332
333 #[test]
334 fn test_schema_validation_wrong_type() {
335 let schema = Schema::new("users").add_field(FieldDefinition::new("age", FieldType::Number));
336
337 let data = json!({
338 "age": "thirty"
339 });
340
341 let result = schema.validate(&data);
342 assert!(result.is_err());
343 assert!(matches!(result, Err(Error::SchemaViolation { .. })));
344 }
345
346 #[test]
347 fn test_schema_apply_defaults() {
348 let schema = Schema::new("users")
349 .add_field(FieldDefinition::new("name", FieldType::String))
350 .add_field(
351 FieldDefinition::new("status", FieldType::String).with_default(json!("active")),
352 );
353
354 let mut data = json!({
355 "name": "Alice"
356 });
357
358 schema.apply_defaults(&mut data).unwrap();
359 assert_eq!(data["status"], "active");
360 }
361
362 #[test]
363 fn test_schema_registry() {
364 let mut registry = SchemaRegistry::new();
365
366 let schema = Schema::new("users")
367 .add_field(FieldDefinition::new("name", FieldType::String).required());
368
369 registry.add_schema(schema);
370
371 let valid = json!({"name": "Alice"});
372 assert!(registry.validate_entity("users", &valid).is_ok());
373
374 let invalid = json!({"age": 30});
375 assert!(registry.validate_entity("users", &invalid).is_err());
376 }
377
378 #[test]
379 fn test_schema_version_stable_on_identical_fields() {
380 let mut registry = SchemaRegistry::new();
381
382 let schema = Schema::new("users")
383 .add_field(FieldDefinition::new("name", FieldType::String).required())
384 .add_field(FieldDefinition::new("age", FieldType::Number));
385 registry.add_schema(schema);
386 assert_eq!(registry.get_schema("users").unwrap().version, 1);
387
388 let same_schema = Schema::new("users")
389 .add_field(FieldDefinition::new("name", FieldType::String).required())
390 .add_field(FieldDefinition::new("age", FieldType::Number));
391 registry.add_schema(same_schema);
392 assert_eq!(registry.get_schema("users").unwrap().version, 1);
393
394 let different_schema = Schema::new("users")
395 .add_field(FieldDefinition::new("name", FieldType::String).required())
396 .add_field(FieldDefinition::new("email", FieldType::String));
397 registry.add_schema(different_schema);
398 assert_eq!(registry.get_schema("users").unwrap().version, 2);
399 }
400}