1use std::collections::HashMap;
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use jsonschema::JSONSchema;
10use thiserror::Error;
11use anyhow::Result;
12
13pub mod compatibility;
14pub use compatibility::CompatibilityMode;
15
16
17#[derive(Debug, Serialize, Deserialize)]
19pub struct Schema {
20 pub id: String,
21 pub name: String,
22 pub version: u32,
23 pub description: Option<String>,
24 pub created_at: DateTime<Utc>,
25 pub updated_at: DateTime<Utc>,
26 pub body: serde_json::Value,
27 #[serde(skip)]
28 compiled_schema: Option<Box<JSONSchema>>,
29}
30
31impl Clone for Schema {
32 fn clone(&self) -> Self {
33 let mut cloned = Self {
34 id: self.id.clone(),
35 name: self.name.clone(),
36 version: self.version,
37 description: self.description.clone(),
38 created_at: self.created_at,
39 updated_at: self.updated_at,
40 body: self.body.clone(),
41 compiled_schema: None, };
43 if self.compiled_schema.is_some() {
45 let _ = cloned.compile();
46 }
47 cloned
48 }
49}
50
51impl Schema {
52 pub fn compile(&mut self) -> Result<()> {
54 let compiled = JSONSchema::compile(&self.body)
55 .map_err(|e| SchemaError::CompilationError(e.to_string()))?;
56 self.compiled_schema = Some(Box::new(compiled));
57 Ok(())
58 }
59
60 pub fn validate(&self, instance: &serde_json::Value) -> Result<(), Vec<String>> {
62 if let Some(ref compiled) = self.compiled_schema {
63 compiled.as_ref().validate(instance)
64 .map_err(|errors| errors.map(|e| e.to_string()).collect())
65 } else {
66 Err(vec!["Schema is not compiled".to_string()])
67 }
68 }
69}
70
71
72#[derive(Error, Debug, PartialEq)]
74pub enum SchemaError {
75 #[error("Schema with ID '{0}' already exists")]
76 SchemaAlreadyExists(String),
77 #[error("Schema with ID '{0}' not found")]
78 SchemaNotFound(String),
79 #[error("Version '{1}' for schema '{0}' already exists")]
80 VersionAlreadyExists(String, u32),
81 #[error("Invalid schema format: {0}")]
82 InvalidSchema(String),
83 #[error("Failed to compile schema: {0}")]
84 CompilationError(String),
85 #[error("Schema compatibility check failed for version {1} of schema '{0}': {2}")]
86 CompatibilityError(String, u32, String),
87}
88
89
90#[derive(Debug)]
92pub struct SchemaRegistry {
93 schemas: HashMap<String, HashMap<u32, Schema>>,
95 compatibility_mode: CompatibilityMode,
97}
98
99impl SchemaRegistry {
100 pub fn new(compatibility_mode: CompatibilityMode) -> Self {
102 Self {
103 schemas: HashMap::new(),
104 compatibility_mode,
105 }
106 }
107
108 pub fn register_schema(&mut self, mut schema: Schema) -> Result<Schema, SchemaError> {
114 schema.compile().map_err(|e| SchemaError::InvalidSchema(e.to_string()))?;
115
116 if let Some(versions) = self.schemas.get(&schema.id) {
117 if let Some(latest_version) = versions.keys().max() {
118 if schema.version > *latest_version {
119 let latest_schema = &versions[latest_version];
120 self.check_compatibility(latest_schema, &schema)?;
121 }
122 }
123 }
124
125 let versions = self.schemas.entry(schema.id.clone()).or_insert_with(HashMap::new);
126
127 if versions.contains_key(&schema.version) {
128 return Err(SchemaError::VersionAlreadyExists(schema.id, schema.version));
129 }
130
131 versions.insert(schema.version, schema.clone());
132 Ok(schema)
133 }
134
135 pub fn get_schema(&self, id: &str, version: u32) -> Option<&Schema> {
137 self.schemas.get(id).and_then(|versions| versions.get(&version))
138 }
139
140 pub fn get_latest_schema(&self, id: &str) -> Option<&Schema> {
142 self.schemas.get(id).and_then(|versions| {
143 versions
144 .keys()
145 .max()
146 .and_then(|latest_version| versions.get(latest_version))
147 })
148 }
149
150 fn check_compatibility(&self, old_schema: &Schema, new_schema: &Schema) -> Result<(), SchemaError> {
152 compatibility::check(&old_schema.body, &new_schema.body, self.compatibility_mode).map_err(
153 |e| {
154 SchemaError::CompatibilityError(
155 new_schema.id.clone(),
156 new_schema.version,
157 e,
158 )
159 },
160 )
161 }
162}
163
164impl Default for SchemaRegistry {
165 fn default() -> Self {
166 Self::new(CompatibilityMode::None)
167 }
168}
169
170
171#[cfg(test)]
172mod tests {
173 use super::*;
174 use serde_json::json;
175
176 fn create_test_schema(id: &str, version: u32) -> Schema {
177 Schema {
178 id: id.to_string(),
179 name: format!("Test Schema {}", id),
180 version,
181 description: Some("A schema for testing".to_string()),
182 created_at: Utc::now(),
183 updated_at: Utc::now(),
184 body: json!({
185 "type": "object",
186 "properties": {
187 "name": { "type": "string" }
188 },
189 "required": ["name"]
190 }),
191 compiled_schema: None,
192 }
193 }
194
195 #[test]
196 fn test_register_and_get_schema() {
197 let mut registry = SchemaRegistry::new(CompatibilityMode::None);
198 let schema = create_test_schema("user", 1);
199
200 let registered_schema = registry.register_schema(schema.clone()).unwrap();
201 assert_eq!(registered_schema, schema);
202
203 let retrieved_schema = registry.get_schema("user", 1).unwrap();
204 assert_eq!(retrieved_schema, &schema);
205 }
206
207 #[test]
208 fn test_register_duplicate_version() {
209 let mut registry = SchemaRegistry::new(CompatibilityMode::None);
210 let schema1 = create_test_schema("user", 1);
211 let schema2 = create_test_schema("user", 1);
212
213 registry.register_schema(schema1).unwrap();
214 let result = registry.register_schema(schema2);
215
216 assert_eq!(result, Err(SchemaError::VersionAlreadyExists("user".to_string(), 1)));
217 }
218
219 #[test]
220 fn test_get_non_existent_schema() {
221 let registry = SchemaRegistry::new(CompatibilityMode::None);
222 assert!(registry.get_schema("user", 1).is_none());
223 }
224
225 #[test]
226 fn test_get_latest_schema() {
227 let mut registry = SchemaRegistry::new(CompatibilityMode::None);
228 let schema1 = create_test_schema("user", 1);
229 let schema2 = create_test_schema("user", 2);
230
231 registry.register_schema(schema1).unwrap();
232 registry.register_schema(schema2.clone()).unwrap();
233
234 let latest = registry.get_latest_schema("user").unwrap();
235 assert_eq!(latest.version, 2);
236 }
237
238 #[test]
239 fn test_schema_validation() {
240 let mut schema = create_test_schema("user", 1);
241 schema.compile().unwrap();
242
243 let valid_instance = json!({ "name": "Alice" });
244 assert!(schema.validate(&valid_instance).is_ok());
245
246 let invalid_instance = json!({ "age": 30 });
247 assert!(schema.validate(&invalid_instance).is_err());
248 }
249
250 #[test]
251 fn test_invalid_schema_compilation() {
252 let mut registry = SchemaRegistry::new(CompatibilityMode::None);
253 let mut schema = create_test_schema("user", 1);
254 schema.body = json!({ "type": "invalid" }); let result = registry.register_schema(schema);
257 assert!(matches!(result, Err(SchemaError::InvalidSchema(_))));
258 }
259
260 #[test]
261 fn test_backward_compatibility_ok() {
262 let mut registry = SchemaRegistry::new(CompatibilityMode::Backward);
263 let schema1 = create_test_schema("user", 1);
264 registry.register_schema(schema1.clone()).unwrap();
265
266 let mut schema2 = create_test_schema("user", 2);
268 schema2.body = json!({
269 "type": "object",
270 "properties": {
271 "name": { "type": "string" },
272 "email": { "type": "string" }
273 },
274 "required": ["name"]
275 });
276
277 assert!(registry.register_schema(schema2).is_ok());
278 }
279
280 #[test]
281 fn test_backward_compatibility_fail() {
282 let mut registry = SchemaRegistry::new(CompatibilityMode::Backward);
283 let schema1 = create_test_schema("user", 1);
284 registry.register_schema(schema1.clone()).unwrap();
285
286 let mut schema2 = create_test_schema("user", 2);
288 schema2.body = json!({
289 "type": "object",
290 "properties": {
291 "name": { "type": "string" },
292 "age": { "type": "integer" }
293 },
294 "required": ["name", "age"]
295 });
296
297 let result = registry.register_schema(schema2);
298 assert!(matches!(result, Err(SchemaError::CompatibilityError(_, _, _))));
299 assert!(result.unwrap_err().to_string().contains("Optional properties {\"age\"} were made required."));
300 }
301
302 #[test]
303 fn test_forward_compatibility_fail() {
304 let mut registry = SchemaRegistry::new(CompatibilityMode::Forward);
305 let mut schema1 = create_test_schema("user", 1);
306 schema1.body = json!({
307 "type": "object",
308 "properties": {
309 "name": { "type": "string" }
310 },
311 "required": ["name"],
312 "additionalProperties": false
313 });
314 registry.register_schema(schema1.clone()).unwrap();
315
316 let mut schema2 = create_test_schema("user", 2);
319 schema2.body = json!({
320 "type": "object",
321 "properties": {
322 "name": { "type": "string" },
323 "email": { "type": "string" }
324 },
325 "required": ["name"]
326 });
327
328 let result = registry.register_schema(schema2);
329 assert!(matches!(result, Err(SchemaError::CompatibilityError(_, _, _))));
330 assert!(result.unwrap_err().to_string().contains("New properties {\"email\"} added, but old schema does not allow additional properties."));
331 }
332}