1use crate::error::{AllSourceError, Result};
2use chrono::{DateTime, Utc};
3use parking_lot::RwLock;
4use serde::{Deserialize, Serialize};
5use serde_json::Value as JsonValue;
6use std::collections::HashMap;
7use std::sync::Arc;
8use uuid::Uuid;
9
10#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
12#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
13pub enum CompatibilityMode {
14 None,
16 Backward,
18 Forward,
20 Full,
22}
23
24impl Default for CompatibilityMode {
25 fn default() -> Self {
26 Self::Backward
27 }
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct Schema {
33 pub id: Uuid,
35
36 pub subject: String,
38
39 pub version: u32,
41
42 pub schema: JsonValue,
44
45 pub created_at: DateTime<Utc>,
47
48 pub description: Option<String>,
50
51 pub tags: Vec<String>,
53}
54
55impl Schema {
56 pub fn new(subject: String, version: u32, schema: JsonValue) -> Self {
57 Self {
58 id: Uuid::new_v4(),
59 subject,
60 version,
61 schema,
62 created_at: Utc::now(),
63 description: None,
64 tags: Vec::new(),
65 }
66 }
67}
68
69#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct RegisterSchemaRequest {
72 pub subject: String,
73 pub schema: JsonValue,
74 pub description: Option<String>,
75 pub tags: Option<Vec<String>>,
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
80pub struct RegisterSchemaResponse {
81 pub schema_id: Uuid,
82 pub subject: String,
83 pub version: u32,
84 pub created_at: DateTime<Utc>,
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct ValidateEventRequest {
90 pub subject: String,
91 pub version: Option<u32>,
92 pub payload: JsonValue,
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize)]
97pub struct ValidateEventResponse {
98 pub valid: bool,
99 pub errors: Vec<String>,
100 pub schema_version: u32,
101}
102
103#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct CompatibilityCheckResult {
106 pub compatible: bool,
107 pub compatibility_mode: CompatibilityMode,
108 pub issues: Vec<String>,
109}
110
111#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct SchemaRegistryStats {
114 pub total_schemas: usize,
115 pub total_subjects: usize,
116 pub validations_performed: u64,
117 pub validation_failures: u64,
118}
119
120#[derive(Debug, Clone, Serialize, Deserialize)]
122pub struct SchemaRegistryConfig {
123 pub default_compatibility: CompatibilityMode,
125
126 pub auto_register: bool,
128
129 pub enforce_validation: bool,
131}
132
133impl Default for SchemaRegistryConfig {
134 fn default() -> Self {
135 Self {
136 default_compatibility: CompatibilityMode::Backward,
137 auto_register: false,
138 enforce_validation: false,
139 }
140 }
141}
142
143pub struct SchemaRegistry {
145 schemas: Arc<RwLock<HashMap<String, HashMap<u32, Schema>>>>,
148
149 latest_versions: Arc<RwLock<HashMap<String, u32>>>,
151
152 compatibility_modes: Arc<RwLock<HashMap<String, CompatibilityMode>>>,
154
155 config: SchemaRegistryConfig,
157
158 stats: Arc<RwLock<SchemaRegistryStats>>,
160}
161
162impl SchemaRegistry {
163 pub fn new(config: SchemaRegistryConfig) -> Self {
164 Self {
165 schemas: Arc::new(RwLock::new(HashMap::new())),
166 latest_versions: Arc::new(RwLock::new(HashMap::new())),
167 compatibility_modes: Arc::new(RwLock::new(HashMap::new())),
168 config,
169 stats: Arc::new(RwLock::new(SchemaRegistryStats {
170 total_schemas: 0,
171 total_subjects: 0,
172 validations_performed: 0,
173 validation_failures: 0,
174 })),
175 }
176 }
177
178 pub fn register_schema(
180 &self,
181 subject: String,
182 schema: JsonValue,
183 description: Option<String>,
184 tags: Option<Vec<String>>,
185 ) -> Result<RegisterSchemaResponse> {
186 let mut schemas = self.schemas.write();
187 let mut latest_versions = self.latest_versions.write();
188
189 let subject_schemas = schemas.entry(subject.clone()).or_insert_with(HashMap::new);
191
192 let next_version = latest_versions.get(&subject).map(|v| v + 1).unwrap_or(1);
194
195 if next_version > 1 {
197 let prev_version = next_version - 1;
198 if let Some(prev_schema) = subject_schemas.get(&prev_version) {
199 let compatibility = self.get_compatibility_mode(&subject);
200 let check_result =
201 self.check_compatibility(&prev_schema.schema, &schema, compatibility)?;
202
203 if !check_result.compatible {
204 return Err(AllSourceError::ValidationError(format!(
205 "Schema compatibility check failed: {}",
206 check_result.issues.join(", ")
207 )));
208 }
209 }
210 }
211
212 let mut new_schema = Schema::new(subject.clone(), next_version, schema);
214 new_schema.description = description;
215 new_schema.tags = tags.unwrap_or_default();
216
217 let schema_id = new_schema.id;
218 let created_at = new_schema.created_at;
219
220 subject_schemas.insert(next_version, new_schema);
221 latest_versions.insert(subject.clone(), next_version);
222
223 let mut stats = self.stats.write();
225 stats.total_schemas += 1;
226 if next_version == 1 {
227 stats.total_subjects += 1;
228 }
229
230 tracing::info!(
231 "📋 Registered schema v{} for subject '{}' (ID: {})",
232 next_version,
233 subject,
234 schema_id
235 );
236
237 Ok(RegisterSchemaResponse {
238 schema_id,
239 subject,
240 version: next_version,
241 created_at,
242 })
243 }
244
245 pub fn get_schema(&self, subject: &str, version: Option<u32>) -> Result<Schema> {
247 let schemas = self.schemas.read();
248
249 let subject_schemas = schemas.get(subject).ok_or_else(|| {
250 AllSourceError::ValidationError(format!("Subject not found: {}", subject))
251 })?;
252
253 let version = match version {
254 Some(v) => v,
255 None => {
256 let latest_versions = self.latest_versions.read();
257 *latest_versions.get(subject).ok_or_else(|| {
258 AllSourceError::ValidationError(format!("No versions for subject: {}", subject))
259 })?
260 }
261 };
262
263 subject_schemas.get(&version).cloned().ok_or_else(|| {
264 AllSourceError::ValidationError(format!(
265 "Schema version {} not found for subject: {}",
266 version, subject
267 ))
268 })
269 }
270
271 pub fn list_versions(&self, subject: &str) -> Result<Vec<u32>> {
273 let schemas = self.schemas.read();
274
275 let subject_schemas = schemas.get(subject).ok_or_else(|| {
276 AllSourceError::ValidationError(format!("Subject not found: {}", subject))
277 })?;
278
279 let mut versions: Vec<u32> = subject_schemas.keys().copied().collect();
280 versions.sort_unstable();
281
282 Ok(versions)
283 }
284
285 pub fn list_subjects(&self) -> Vec<String> {
287 let schemas = self.schemas.read();
288 schemas.keys().cloned().collect()
289 }
290
291 pub fn validate(
293 &self,
294 subject: &str,
295 version: Option<u32>,
296 payload: &JsonValue,
297 ) -> Result<ValidateEventResponse> {
298 let schema = self.get_schema(subject, version)?;
299
300 let validation_result = self.validate_json(payload, &schema.schema);
301
302 let mut stats = self.stats.write();
304 stats.validations_performed += 1;
305 if !validation_result.is_empty() {
306 stats.validation_failures += 1;
307 }
308
309 Ok(ValidateEventResponse {
310 valid: validation_result.is_empty(),
311 errors: validation_result,
312 schema_version: schema.version,
313 })
314 }
315
316 fn validate_json(&self, data: &JsonValue, schema: &JsonValue) -> Vec<String> {
318 let mut errors = Vec::new();
319
320 if let Some(required) = schema.get("required").and_then(|r| r.as_array()) {
325 if let Some(obj) = data.as_object() {
326 for req_field in required {
327 if let Some(field_name) = req_field.as_str() {
328 if !obj.contains_key(field_name) {
329 errors.push(format!("Missing required field: {}", field_name));
330 }
331 }
332 }
333 }
334 }
335
336 if let Some(expected_type) = schema.get("type").and_then(|t| t.as_str()) {
338 let actual_type = match data {
339 JsonValue::Null => "null",
340 JsonValue::Bool(_) => "boolean",
341 JsonValue::Number(_) => "number",
342 JsonValue::String(_) => "string",
343 JsonValue::Array(_) => "array",
344 JsonValue::Object(_) => "object",
345 };
346
347 if expected_type != actual_type {
348 errors.push(format!(
349 "Type mismatch: expected {}, got {}",
350 expected_type, actual_type
351 ));
352 }
353 }
354
355 if let (Some(properties), Some(data_obj)) = (
357 schema.get("properties").and_then(|p| p.as_object()),
358 data.as_object(),
359 ) {
360 for (key, value) in data_obj {
361 if let Some(prop_schema) = properties.get(key) {
362 let nested_errors = self.validate_json(value, prop_schema);
363 for err in nested_errors {
364 errors.push(format!("{}.{}", key, err));
365 }
366 }
367 }
368 }
369
370 errors
371 }
372
373 fn check_compatibility(
375 &self,
376 old_schema: &JsonValue,
377 new_schema: &JsonValue,
378 mode: CompatibilityMode,
379 ) -> Result<CompatibilityCheckResult> {
380 let mut issues = Vec::new();
381
382 match mode {
383 CompatibilityMode::None => {
384 return Ok(CompatibilityCheckResult {
385 compatible: true,
386 compatibility_mode: mode,
387 issues: Vec::new(),
388 });
389 }
390 CompatibilityMode::Backward => {
391 issues.extend(self.check_backward_compatibility(old_schema, new_schema));
393 }
394 CompatibilityMode::Forward => {
395 issues.extend(self.check_forward_compatibility(old_schema, new_schema));
397 }
398 CompatibilityMode::Full => {
399 issues.extend(self.check_backward_compatibility(old_schema, new_schema));
401 issues.extend(self.check_forward_compatibility(old_schema, new_schema));
402 }
403 }
404
405 Ok(CompatibilityCheckResult {
406 compatible: issues.is_empty(),
407 compatibility_mode: mode,
408 issues,
409 })
410 }
411
412 fn check_backward_compatibility(
413 &self,
414 old_schema: &JsonValue,
415 new_schema: &JsonValue,
416 ) -> Vec<String> {
417 let mut issues = Vec::new();
418
419 if let Some(old_required) = old_schema.get("required").and_then(|r| r.as_array()) {
421 let new_required = new_schema
422 .get("required")
423 .and_then(|r| r.as_array())
424 .map(|arr| arr.iter().filter_map(|v| v.as_str()).collect::<Vec<_>>())
425 .unwrap_or_default();
426
427 for old_req in old_required {
428 if let Some(field_name) = old_req.as_str() {
429 if !new_required.contains(&field_name) {
430 issues.push(format!(
431 "Backward compatibility: required field '{}' removed",
432 field_name
433 ));
434 }
435 }
436 }
437 }
438
439 issues
440 }
441
442 fn check_forward_compatibility(
443 &self,
444 old_schema: &JsonValue,
445 new_schema: &JsonValue,
446 ) -> Vec<String> {
447 let mut issues = Vec::new();
448
449 if let Some(new_required) = new_schema.get("required").and_then(|r| r.as_array()) {
451 let old_required = old_schema
452 .get("required")
453 .and_then(|r| r.as_array())
454 .map(|arr| arr.iter().filter_map(|v| v.as_str()).collect::<Vec<_>>())
455 .unwrap_or_default();
456
457 for new_req in new_required {
458 if let Some(field_name) = new_req.as_str() {
459 if !old_required.contains(&field_name) {
460 issues.push(format!(
461 "Forward compatibility: new required field '{}' added",
462 field_name
463 ));
464 }
465 }
466 }
467 }
468
469 issues
470 }
471
472 pub fn set_compatibility_mode(&self, subject: String, mode: CompatibilityMode) {
474 let mut modes = self.compatibility_modes.write();
475 modes.insert(subject, mode);
476 }
477
478 pub fn get_compatibility_mode(&self, subject: &str) -> CompatibilityMode {
480 let modes = self.compatibility_modes.read();
481 modes
482 .get(subject)
483 .copied()
484 .unwrap_or(self.config.default_compatibility)
485 }
486
487 pub fn delete_schema(&self, subject: &str, version: u32) -> Result<bool> {
489 let mut schemas = self.schemas.write();
490
491 if let Some(subject_schemas) = schemas.get_mut(subject) {
492 if subject_schemas.remove(&version).is_some() {
493 tracing::info!("🗑️ Deleted schema v{} for subject '{}'", version, subject);
494
495 let mut stats = self.stats.write();
497 stats.total_schemas = stats.total_schemas.saturating_sub(1);
498
499 return Ok(true);
500 }
501 }
502
503 Ok(false)
504 }
505
506 pub fn stats(&self) -> SchemaRegistryStats {
508 self.stats.read().clone()
509 }
510
511 pub fn config(&self) -> &SchemaRegistryConfig {
513 &self.config
514 }
515}
516
517#[cfg(test)]
518mod tests {
519 use super::*;
520 use serde_json::json;
521
522 #[test]
523 fn test_schema_registration() {
524 let registry = SchemaRegistry::new(SchemaRegistryConfig::default());
525
526 let schema = json!({
527 "type": "object",
528 "properties": {
529 "user_id": {"type": "string"},
530 "email": {"type": "string"}
531 },
532 "required": ["user_id", "email"]
533 });
534
535 let response = registry
536 .register_schema(
537 "user.created".to_string(),
538 schema,
539 Some("User creation event".to_string()),
540 None,
541 )
542 .unwrap();
543
544 assert_eq!(response.version, 1);
545 assert_eq!(response.subject, "user.created");
546 }
547
548 #[test]
549 fn test_schema_validation() {
550 let registry = SchemaRegistry::new(SchemaRegistryConfig::default());
551
552 let schema = json!({
553 "type": "object",
554 "properties": {
555 "user_id": {"type": "string"},
556 "email": {"type": "string"}
557 },
558 "required": ["user_id", "email"]
559 });
560
561 registry
562 .register_schema("user.created".to_string(), schema, None, None)
563 .unwrap();
564
565 let valid_payload = json!({
567 "user_id": "123",
568 "email": "test@example.com"
569 });
570
571 let result = registry
572 .validate("user.created", None, &valid_payload)
573 .unwrap();
574 assert!(result.valid);
575
576 let invalid_payload = json!({
578 "user_id": "123"
579 });
580
581 let result = registry
582 .validate("user.created", None, &invalid_payload)
583 .unwrap();
584 assert!(!result.valid);
585 assert!(!result.errors.is_empty());
586 }
587
588 #[test]
589 fn test_backward_compatibility() {
590 let registry = SchemaRegistry::new(SchemaRegistryConfig {
591 default_compatibility: CompatibilityMode::Backward,
592 ..Default::default()
593 });
594
595 let schema_v1 = json!({
596 "type": "object",
597 "required": ["user_id", "email"]
598 });
599
600 registry
601 .register_schema("user.created".to_string(), schema_v1, None, None)
602 .unwrap();
603
604 let schema_v2 = json!({
606 "type": "object",
607 "required": ["user_id", "email"]
608 });
609
610 let result = registry.register_schema("user.created".to_string(), schema_v2, None, None);
611 assert!(result.is_ok());
612
613 let schema_v3 = json!({
615 "type": "object",
616 "required": ["user_id"]
617 });
618
619 let result = registry.register_schema("user.created".to_string(), schema_v3, None, None);
620 assert!(result.is_err());
621 }
622}