kotoba_schema_registry/
lib.rs

1//! # kotoba-schema-registry
2//!
3//! A schema registry for managing and evolving schemas in Kotoba.
4//! プロセスネットワーク as GTS(DPO)+OpenGraph with Merkle DAG & PG view
5
6use std::collections::HashMap;
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use jsonschema::JSONSchema;
10use thiserror::Error;
11use anyhow::Result;
12
13pub mod compatibility;
14pub use compatibility::CompatibilityMode;
15
16
17/// Represents a schema in the registry.
18#[derive(Debug, Serialize, Deserialize)]
19pub struct Schema {
20    pub id: String,
21    pub name: String,
22    pub version: u32,
23    pub description: Option<String>,
24    pub created_at: DateTime<Utc>,
25    pub updated_at: DateTime<Utc>,
26    pub body: serde_json::Value,
27    #[serde(skip)]
28    compiled_schema: Option<Box<JSONSchema>>,
29}
30
31impl Clone for Schema {
32    fn clone(&self) -> Self {
33        let mut cloned = Self {
34            id: self.id.clone(),
35            name: self.name.clone(),
36            version: self.version,
37            description: self.description.clone(),
38            created_at: self.created_at,
39            updated_at: self.updated_at,
40            body: self.body.clone(),
41            compiled_schema: None, // Don't clone the compiled schema
42        };
43        // Recompile if needed
44        if self.compiled_schema.is_some() {
45            let _ = cloned.compile();
46        }
47        cloned
48    }
49}
50
51impl Schema {
52    /// Compiles the JSON schema body for validation.
53    pub fn compile(&mut self) -> Result<()> {
54        let compiled = JSONSchema::compile(&self.body)
55            .map_err(|e| SchemaError::CompilationError(e.to_string()))?;
56        self.compiled_schema = Some(Box::new(compiled));
57        Ok(())
58    }
59
60    /// Validates a JSON value against the schema.
61    pub fn validate(&self, instance: &serde_json::Value) -> Result<(), Vec<String>> {
62        if let Some(ref compiled) = self.compiled_schema {
63            compiled.as_ref().validate(instance)
64                .map_err(|errors| errors.map(|e| e.to_string()).collect())
65        } else {
66            Err(vec!["Schema is not compiled".to_string()])
67        }
68    }
69}
70
71
72/// Errors that can occur within the schema registry.
73#[derive(Error, Debug, PartialEq)]
74pub enum SchemaError {
75    #[error("Schema with ID '{0}' already exists")]
76    SchemaAlreadyExists(String),
77    #[error("Schema with ID '{0}' not found")]
78    SchemaNotFound(String),
79    #[error("Version '{1}' for schema '{0}' already exists")]
80    VersionAlreadyExists(String, u32),
81    #[error("Invalid schema format: {0}")]
82    InvalidSchema(String),
83    #[error("Failed to compile schema: {0}")]
84    CompilationError(String),
85    #[error("Schema compatibility check failed for version {1} of schema '{0}': {2}")]
86    CompatibilityError(String, u32, String),
87}
88
89
90/// The schema registry.
91#[derive(Debug)]
92pub struct SchemaRegistry {
93    /// Storage for schemas, mapping a schema ID to a map of versions and schemas.
94    schemas: HashMap<String, HashMap<u32, Schema>>,
95    /// Default compatibility mode for this registry.
96    compatibility_mode: CompatibilityMode,
97}
98
99impl SchemaRegistry {
100    /// Creates a new, empty schema registry with a specific compatibility mode.
101    pub fn new(compatibility_mode: CompatibilityMode) -> Self {
102        Self {
103            schemas: HashMap::new(),
104            compatibility_mode,
105        }
106    }
107
108    /// Registers a new schema or a new version of an existing schema.
109    ///
110    /// If the schema ID does not exist, it will be created.
111    /// If the schema ID exists, it will add a new version if the version number is unique
112    /// and it passes the compatibility check.
113    pub fn register_schema(&mut self, mut schema: Schema) -> Result<Schema, SchemaError> {
114        schema.compile().map_err(|e| SchemaError::InvalidSchema(e.to_string()))?;
115        
116        if let Some(versions) = self.schemas.get(&schema.id) {
117            if let Some(latest_version) = versions.keys().max() {
118                if schema.version > *latest_version {
119                    let latest_schema = &versions[latest_version];
120                    self.check_compatibility(latest_schema, &schema)?;
121                }
122            }
123        }
124        
125        let versions = self.schemas.entry(schema.id.clone()).or_insert_with(HashMap::new);
126
127        if versions.contains_key(&schema.version) {
128            return Err(SchemaError::VersionAlreadyExists(schema.id, schema.version));
129        }
130
131        versions.insert(schema.version, schema.clone());
132        Ok(schema)
133    }
134
135    /// Retrieves a specific version of a schema by its ID.
136    pub fn get_schema(&self, id: &str, version: u32) -> Option<&Schema> {
137        self.schemas.get(id).and_then(|versions| versions.get(&version))
138    }
139
140    /// Retrieves the latest version of a schema by its ID.
141    pub fn get_latest_schema(&self, id: &str) -> Option<&Schema> {
142        self.schemas.get(id).and_then(|versions| {
143            versions
144                .keys()
145                .max()
146                .and_then(|latest_version| versions.get(latest_version))
147        })
148    }
149    
150    /// Checks compatibility between an old and a new schema.
151    fn check_compatibility(&self, old_schema: &Schema, new_schema: &Schema) -> Result<(), SchemaError> {
152        compatibility::check(&old_schema.body, &new_schema.body, self.compatibility_mode).map_err(
153            |e| {
154                SchemaError::CompatibilityError(
155                    new_schema.id.clone(),
156                    new_schema.version,
157                    e,
158                )
159            },
160        )
161    }
162}
163
164impl Default for SchemaRegistry {
165    fn default() -> Self {
166        Self::new(CompatibilityMode::None)
167    }
168}
169
170
171#[cfg(test)]
172mod tests {
173    use super::*;
174    use serde_json::json;
175
176    fn create_test_schema(id: &str, version: u32) -> Schema {
177        Schema {
178            id: id.to_string(),
179            name: format!("Test Schema {}", id),
180            version,
181            description: Some("A schema for testing".to_string()),
182            created_at: Utc::now(),
183            updated_at: Utc::now(),
184            body: json!({
185                "type": "object",
186                "properties": {
187                    "name": { "type": "string" }
188                },
189                "required": ["name"]
190            }),
191            compiled_schema: None,
192        }
193    }
194
195    #[test]
196    fn test_register_and_get_schema() {
197        let mut registry = SchemaRegistry::new(CompatibilityMode::None);
198        let schema = create_test_schema("user", 1);
199
200        let registered_schema = registry.register_schema(schema.clone()).unwrap();
201        assert_eq!(registered_schema, schema);
202
203        let retrieved_schema = registry.get_schema("user", 1).unwrap();
204        assert_eq!(retrieved_schema, &schema);
205    }
206
207    #[test]
208    fn test_register_duplicate_version() {
209        let mut registry = SchemaRegistry::new(CompatibilityMode::None);
210        let schema1 = create_test_schema("user", 1);
211        let schema2 = create_test_schema("user", 1);
212
213        registry.register_schema(schema1).unwrap();
214        let result = registry.register_schema(schema2);
215
216        assert_eq!(result, Err(SchemaError::VersionAlreadyExists("user".to_string(), 1)));
217    }
218
219    #[test]
220    fn test_get_non_existent_schema() {
221        let registry = SchemaRegistry::new(CompatibilityMode::None);
222        assert!(registry.get_schema("user", 1).is_none());
223    }
224
225    #[test]
226    fn test_get_latest_schema() {
227        let mut registry = SchemaRegistry::new(CompatibilityMode::None);
228        let schema1 = create_test_schema("user", 1);
229        let schema2 = create_test_schema("user", 2);
230
231        registry.register_schema(schema1).unwrap();
232        registry.register_schema(schema2.clone()).unwrap();
233
234        let latest = registry.get_latest_schema("user").unwrap();
235        assert_eq!(latest.version, 2);
236    }
237
238    #[test]
239    fn test_schema_validation() {
240        let mut schema = create_test_schema("user", 1);
241        schema.compile().unwrap();
242
243        let valid_instance = json!({ "name": "Alice" });
244        assert!(schema.validate(&valid_instance).is_ok());
245
246        let invalid_instance = json!({ "age": 30 });
247        assert!(schema.validate(&invalid_instance).is_err());
248    }
249
250    #[test]
251    fn test_invalid_schema_compilation() {
252        let mut registry = SchemaRegistry::new(CompatibilityMode::None);
253        let mut schema = create_test_schema("user", 1);
254        schema.body = json!({ "type": "invalid" }); // Invalid schema type
255
256        let result = registry.register_schema(schema);
257        assert!(matches!(result, Err(SchemaError::InvalidSchema(_))));
258    }
259
260    #[test]
261    fn test_backward_compatibility_ok() {
262        let mut registry = SchemaRegistry::new(CompatibilityMode::Backward);
263        let schema1 = create_test_schema("user", 1);
264        registry.register_schema(schema1.clone()).unwrap();
265
266        // Add a new optional field `email`, which is backward compatible.
267        let mut schema2 = create_test_schema("user", 2);
268        schema2.body = json!({
269            "type": "object",
270            "properties": {
271                "name": { "type": "string" },
272                "email": { "type": "string" }
273            },
274            "required": ["name"]
275        });
276        
277        assert!(registry.register_schema(schema2).is_ok());
278    }
279
280    #[test]
281    fn test_backward_compatibility_fail() {
282        let mut registry = SchemaRegistry::new(CompatibilityMode::Backward);
283        let schema1 = create_test_schema("user", 1);
284        registry.register_schema(schema1.clone()).unwrap();
285
286        // Add a new required field `age`, which is NOT backward compatible.
287        let mut schema2 = create_test_schema("user", 2);
288        schema2.body = json!({
289            "type": "object",
290            "properties": {
291                "name": { "type": "string" },
292                "age": { "type": "integer" }
293            },
294            "required": ["name", "age"]
295        });
296
297        let result = registry.register_schema(schema2);
298        assert!(matches!(result, Err(SchemaError::CompatibilityError(_, _, _))));
299        assert!(result.unwrap_err().to_string().contains("Optional properties {\"age\"} were made required."));
300    }
301
302    #[test]
303    fn test_forward_compatibility_fail() {
304        let mut registry = SchemaRegistry::new(CompatibilityMode::Forward);
305        let mut schema1 = create_test_schema("user", 1);
306        schema1.body = json!({
307            "type": "object",
308            "properties": {
309                "name": { "type": "string" }
310            },
311            "required": ["name"],
312            "additionalProperties": false
313        });
314        registry.register_schema(schema1.clone()).unwrap();
315
316        // Add a new optional field `email`. This is not forward compatible
317        // if the old schema has `additionalProperties: false`.
318        let mut schema2 = create_test_schema("user", 2);
319        schema2.body = json!({
320            "type": "object",
321            "properties": {
322                "name": { "type": "string" },
323                "email": { "type": "string" }
324            },
325            "required": ["name"]
326        });
327        
328        let result = registry.register_schema(schema2);
329        assert!(matches!(result, Err(SchemaError::CompatibilityError(_, _, _))));
330        assert!(result.unwrap_err().to_string().contains("New properties {\"email\"} added, but old schema does not allow additional properties."));
331    }
332}