1use crate::error::{AllSourceError, Result};
2use chrono::{DateTime, Utc};
3use dashmap::DashMap;
4use parking_lot::RwLock;
5use serde::{Deserialize, Serialize};
6use serde_json::Value as JsonValue;
7use std::{collections::HashMap, sync::Arc};
8use uuid::Uuid;
9
10#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq)]
12#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
13pub enum CompatibilityMode {
14 None,
16 #[default]
18 Backward,
19 Forward,
21 Full,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct Schema {
28 pub id: Uuid,
30
31 pub subject: String,
33
34 pub version: u32,
36
37 pub schema: JsonValue,
39
40 pub created_at: DateTime<Utc>,
42
43 pub description: Option<String>,
45
46 pub tags: Vec<String>,
48}
49
50impl Schema {
51 pub fn new(subject: String, version: u32, schema: JsonValue) -> Self {
52 Self {
53 id: Uuid::new_v4(),
54 subject,
55 version,
56 schema,
57 created_at: Utc::now(),
58 description: None,
59 tags: Vec::new(),
60 }
61 }
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct RegisterSchemaRequest {
67 pub subject: String,
68 pub schema: JsonValue,
69 pub description: Option<String>,
70 pub tags: Option<Vec<String>>,
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct RegisterSchemaResponse {
76 pub schema_id: Uuid,
77 pub subject: String,
78 pub version: u32,
79 pub created_at: DateTime<Utc>,
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
84pub struct ValidateEventRequest {
85 pub subject: String,
86 pub version: Option<u32>,
87 pub payload: JsonValue,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct ValidateEventResponse {
93 pub valid: bool,
94 pub errors: Vec<String>,
95 pub schema_version: u32,
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize)]
100pub struct CompatibilityCheckResult {
101 pub compatible: bool,
102 pub compatibility_mode: CompatibilityMode,
103 pub issues: Vec<String>,
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize)]
108pub struct SchemaRegistryStats {
109 pub total_schemas: usize,
110 pub total_subjects: usize,
111 pub validations_performed: u64,
112 pub validation_failures: u64,
113}
114
115#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct SchemaRegistryConfig {
118 pub default_compatibility: CompatibilityMode,
120
121 pub auto_register: bool,
123
124 pub enforce_validation: bool,
126}
127
128impl Default for SchemaRegistryConfig {
129 fn default() -> Self {
130 Self {
131 default_compatibility: CompatibilityMode::Backward,
132 auto_register: false,
133 enforce_validation: false,
134 }
135 }
136}
137
138pub struct SchemaRegistry {
140 schemas: Arc<DashMap<String, HashMap<u32, Schema>>>,
143
144 latest_versions: Arc<DashMap<String, u32>>,
146
147 compatibility_modes: Arc<DashMap<String, CompatibilityMode>>,
149
150 config: SchemaRegistryConfig,
152
153 stats: Arc<RwLock<SchemaRegistryStats>>,
155}
156
157impl SchemaRegistry {
158 pub fn new(config: SchemaRegistryConfig) -> Self {
159 Self {
160 schemas: Arc::new(DashMap::new()),
161 latest_versions: Arc::new(DashMap::new()),
162 compatibility_modes: Arc::new(DashMap::new()),
163 config,
164 stats: Arc::new(RwLock::new(SchemaRegistryStats {
165 total_schemas: 0,
166 total_subjects: 0,
167 validations_performed: 0,
168 validation_failures: 0,
169 })),
170 }
171 }
172
173 pub fn register_schema(
175 &self,
176 subject: String,
177 schema: JsonValue,
178 description: Option<String>,
179 tags: Option<Vec<String>>,
180 ) -> Result<RegisterSchemaResponse> {
181 let next_version = self.latest_versions.get(&subject).map_or(1, |v| *v + 1);
183
184 if next_version > 1 {
186 let prev_version = next_version - 1;
187 if let Some(subject_schemas) = self.schemas.get(&subject)
188 && let Some(prev_schema) = subject_schemas.get(&prev_version)
189 {
190 let compatibility = self.get_compatibility_mode(&subject);
191 let check_result =
192 self.check_compatibility(&prev_schema.schema, &schema, compatibility)?;
193
194 if !check_result.compatible {
195 return Err(AllSourceError::ValidationError(format!(
196 "Schema compatibility check failed: {}",
197 check_result.issues.join(", ")
198 )));
199 }
200 }
201 }
202
203 let mut new_schema = Schema::new(subject.clone(), next_version, schema);
205 new_schema.description = description;
206 new_schema.tags = tags.unwrap_or_default();
207
208 let schema_id = new_schema.id;
209 let created_at = new_schema.created_at;
210
211 self.schemas
213 .entry(subject.clone())
214 .or_default()
215 .insert(next_version, new_schema);
216 self.latest_versions.insert(subject.clone(), next_version);
217
218 let mut stats = self.stats.write();
220 stats.total_schemas += 1;
221 if next_version == 1 {
222 stats.total_subjects += 1;
223 }
224
225 tracing::info!(
226 "📋 Registered schema v{} for subject '{}' (ID: {})",
227 next_version,
228 subject,
229 schema_id
230 );
231
232 Ok(RegisterSchemaResponse {
233 schema_id,
234 subject,
235 version: next_version,
236 created_at,
237 })
238 }
239
240 pub fn get_schema(&self, subject: &str, version: Option<u32>) -> Result<Schema> {
242 let subject_schemas = self.schemas.get(subject).ok_or_else(|| {
243 AllSourceError::ValidationError(format!("Subject not found: {subject}"))
244 })?;
245
246 let version = match version {
247 Some(v) => v,
248 None => *self.latest_versions.get(subject).ok_or_else(|| {
249 AllSourceError::ValidationError(format!("No versions for subject: {subject}"))
250 })?,
251 };
252
253 subject_schemas.get(&version).cloned().ok_or_else(|| {
254 AllSourceError::ValidationError(format!(
255 "Schema version {version} not found for subject: {subject}"
256 ))
257 })
258 }
259
260 pub fn list_versions(&self, subject: &str) -> Result<Vec<u32>> {
262 let subject_schemas = self.schemas.get(subject).ok_or_else(|| {
263 AllSourceError::ValidationError(format!("Subject not found: {subject}"))
264 })?;
265
266 let mut versions: Vec<u32> = subject_schemas.keys().copied().collect();
267 versions.sort_unstable();
268
269 Ok(versions)
270 }
271
272 pub fn list_subjects(&self) -> Vec<String> {
274 self.schemas
275 .iter()
276 .map(|entry| entry.key().clone())
277 .collect()
278 }
279
280 pub fn validate(
282 &self,
283 subject: &str,
284 version: Option<u32>,
285 payload: &JsonValue,
286 ) -> Result<ValidateEventResponse> {
287 let schema = self.get_schema(subject, version)?;
288
289 let validation_result = Self::validate_json(payload, &schema.schema);
290
291 let mut stats = self.stats.write();
293 stats.validations_performed += 1;
294 if !validation_result.is_empty() {
295 stats.validation_failures += 1;
296 }
297
298 Ok(ValidateEventResponse {
299 valid: validation_result.is_empty(),
300 errors: validation_result,
301 schema_version: schema.version,
302 })
303 }
304
305 fn validate_json(data: &JsonValue, schema: &JsonValue) -> Vec<String> {
307 let mut errors = Vec::new();
308
309 if let Some(required) = schema.get("required").and_then(|r| r.as_array())
314 && let Some(obj) = data.as_object()
315 {
316 for req_field in required {
317 if let Some(field_name) = req_field.as_str()
318 && !obj.contains_key(field_name)
319 {
320 errors.push(format!("Missing required field: {field_name}"));
321 }
322 }
323 }
324
325 if let Some(expected_type) = schema.get("type").and_then(|t| t.as_str()) {
327 let actual_type = match data {
328 JsonValue::Null => "null",
329 JsonValue::Bool(_) => "boolean",
330 JsonValue::Number(_) => "number",
331 JsonValue::String(_) => "string",
332 JsonValue::Array(_) => "array",
333 JsonValue::Object(_) => "object",
334 };
335
336 if expected_type != actual_type {
337 errors.push(format!(
338 "Type mismatch: expected {expected_type}, got {actual_type}"
339 ));
340 }
341 }
342
343 if let (Some(properties), Some(data_obj)) = (
345 schema.get("properties").and_then(|p| p.as_object()),
346 data.as_object(),
347 ) {
348 for (key, value) in data_obj {
349 if let Some(prop_schema) = properties.get(key) {
350 let nested_errors = Self::validate_json(value, prop_schema);
351 for err in nested_errors {
352 errors.push(format!("{key}.{err}"));
353 }
354 }
355 }
356 }
357
358 errors
359 }
360
361 fn check_compatibility(
363 &self,
364 old_schema: &JsonValue,
365 new_schema: &JsonValue,
366 mode: CompatibilityMode,
367 ) -> Result<CompatibilityCheckResult> {
368 let mut issues = Vec::new();
369
370 match mode {
371 CompatibilityMode::None => {
372 return Ok(CompatibilityCheckResult {
373 compatible: true,
374 compatibility_mode: mode,
375 issues: Vec::new(),
376 });
377 }
378 CompatibilityMode::Backward => {
379 issues.extend(self.check_backward_compatibility(old_schema, new_schema));
381 }
382 CompatibilityMode::Forward => {
383 issues.extend(self.check_forward_compatibility(old_schema, new_schema));
385 }
386 CompatibilityMode::Full => {
387 issues.extend(self.check_backward_compatibility(old_schema, new_schema));
389 issues.extend(self.check_forward_compatibility(old_schema, new_schema));
390 }
391 }
392
393 Ok(CompatibilityCheckResult {
394 compatible: issues.is_empty(),
395 compatibility_mode: mode,
396 issues,
397 })
398 }
399
400 fn check_backward_compatibility(
401 &self,
402 old_schema: &JsonValue,
403 new_schema: &JsonValue,
404 ) -> Vec<String> {
405 let mut issues = Vec::new();
406
407 if let Some(old_required) = old_schema.get("required").and_then(|r| r.as_array()) {
409 let new_required = new_schema
410 .get("required")
411 .and_then(|r| r.as_array())
412 .map(|arr| arr.iter().filter_map(|v| v.as_str()).collect::<Vec<_>>())
413 .unwrap_or_default();
414
415 for old_req in old_required {
416 if let Some(field_name) = old_req.as_str()
417 && !new_required.contains(&field_name)
418 {
419 issues.push(format!(
420 "Backward compatibility: required field '{field_name}' removed"
421 ));
422 }
423 }
424 }
425
426 issues
427 }
428
429 fn check_forward_compatibility(
430 &self,
431 old_schema: &JsonValue,
432 new_schema: &JsonValue,
433 ) -> Vec<String> {
434 let mut issues = Vec::new();
435
436 if let Some(new_required) = new_schema.get("required").and_then(|r| r.as_array()) {
438 let old_required = old_schema
439 .get("required")
440 .and_then(|r| r.as_array())
441 .map(|arr| arr.iter().filter_map(|v| v.as_str()).collect::<Vec<_>>())
442 .unwrap_or_default();
443
444 for new_req in new_required {
445 if let Some(field_name) = new_req.as_str()
446 && !old_required.contains(&field_name)
447 {
448 issues.push(format!(
449 "Forward compatibility: new required field '{field_name}' added"
450 ));
451 }
452 }
453 }
454
455 issues
456 }
457
458 pub fn set_compatibility_mode(&self, subject: String, mode: CompatibilityMode) {
460 self.compatibility_modes.insert(subject, mode);
461 }
462
463 pub fn get_compatibility_mode(&self, subject: &str) -> CompatibilityMode {
465 self.compatibility_modes
466 .get(subject)
467 .map_or(self.config.default_compatibility, |entry| *entry.value())
468 }
469
470 pub fn delete_schema(&self, subject: &str, version: u32) -> Result<bool> {
472 if let Some(mut subject_schemas) = self.schemas.get_mut(subject)
473 && subject_schemas.remove(&version).is_some()
474 {
475 tracing::info!("🗑️ Deleted schema v{} for subject '{}'", version, subject);
476
477 let mut stats = self.stats.write();
479 stats.total_schemas = stats.total_schemas.saturating_sub(1);
480
481 return Ok(true);
482 }
483
484 Ok(false)
485 }
486
487 pub fn stats(&self) -> SchemaRegistryStats {
489 self.stats.read().clone()
490 }
491
492 pub fn config(&self) -> &SchemaRegistryConfig {
494 &self.config
495 }
496}
497
498#[cfg(test)]
499mod tests {
500 use super::*;
501 use serde_json::json;
502
503 #[test]
504 fn test_schema_registration() {
505 let registry = SchemaRegistry::new(SchemaRegistryConfig::default());
506
507 let schema = json!({
508 "type": "object",
509 "properties": {
510 "user_id": {"type": "string"},
511 "email": {"type": "string"}
512 },
513 "required": ["user_id", "email"]
514 });
515
516 let response = registry
517 .register_schema(
518 "user.created".to_string(),
519 schema,
520 Some("User creation event".to_string()),
521 None,
522 )
523 .unwrap();
524
525 assert_eq!(response.version, 1);
526 assert_eq!(response.subject, "user.created");
527 }
528
529 #[test]
530 fn test_schema_validation() {
531 let registry = SchemaRegistry::new(SchemaRegistryConfig::default());
532
533 let schema = json!({
534 "type": "object",
535 "properties": {
536 "user_id": {"type": "string"},
537 "email": {"type": "string"}
538 },
539 "required": ["user_id", "email"]
540 });
541
542 registry
543 .register_schema("user.created".to_string(), schema, None, None)
544 .unwrap();
545
546 let valid_payload = json!({
548 "user_id": "123",
549 "email": "test@example.com"
550 });
551
552 let result = registry
553 .validate("user.created", None, &valid_payload)
554 .unwrap();
555 assert!(result.valid);
556
557 let invalid_payload = json!({
559 "user_id": "123"
560 });
561
562 let result = registry
563 .validate("user.created", None, &invalid_payload)
564 .unwrap();
565 assert!(!result.valid);
566 assert!(!result.errors.is_empty());
567 }
568
569 #[test]
570 fn test_backward_compatibility() {
571 let registry = SchemaRegistry::new(SchemaRegistryConfig {
572 default_compatibility: CompatibilityMode::Backward,
573 ..Default::default()
574 });
575
576 let schema_v1 = json!({
577 "type": "object",
578 "required": ["user_id", "email"]
579 });
580
581 registry
582 .register_schema("user.created".to_string(), schema_v1, None, None)
583 .unwrap();
584
585 let schema_v2 = json!({
587 "type": "object",
588 "required": ["user_id", "email"]
589 });
590
591 let result = registry.register_schema("user.created".to_string(), schema_v2, None, None);
592 assert!(result.is_ok());
593
594 let schema_v3 = json!({
596 "type": "object",
597 "required": ["user_id"]
598 });
599
600 let result = registry.register_schema("user.created".to_string(), schema_v3, None, None);
601 assert!(result.is_err());
602 }
603}