1use crate::error::{AllSourceError, Result};
2use chrono::{DateTime, Utc};
3use dashmap::DashMap;
4use parking_lot::RwLock;
5use serde::{Deserialize, Serialize};
6use serde_json::Value as JsonValue;
7use std::{collections::HashMap, sync::Arc};
8use uuid::Uuid;
9
10#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq)]
12#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
13pub enum CompatibilityMode {
14 None,
16 #[default]
18 Backward,
19 Forward,
21 Full,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct Schema {
28 pub id: Uuid,
30
31 pub subject: String,
33
34 pub version: u32,
36
37 pub schema: JsonValue,
39
40 pub created_at: DateTime<Utc>,
42
43 pub description: Option<String>,
45
46 pub tags: Vec<String>,
48}
49
50impl Schema {
51 pub fn new(subject: String, version: u32, schema: JsonValue) -> Self {
52 Self {
53 id: Uuid::new_v4(),
54 subject,
55 version,
56 schema,
57 created_at: Utc::now(),
58 description: None,
59 tags: Vec::new(),
60 }
61 }
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct RegisterSchemaRequest {
67 pub subject: String,
68 pub schema: JsonValue,
69 pub description: Option<String>,
70 pub tags: Option<Vec<String>>,
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct RegisterSchemaResponse {
76 pub schema_id: Uuid,
77 pub subject: String,
78 pub version: u32,
79 pub created_at: DateTime<Utc>,
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
84pub struct ValidateEventRequest {
85 pub subject: String,
86 pub version: Option<u32>,
87 pub payload: JsonValue,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct ValidateEventResponse {
93 pub valid: bool,
94 pub errors: Vec<String>,
95 pub schema_version: u32,
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize)]
100pub struct CompatibilityCheckResult {
101 pub compatible: bool,
102 pub compatibility_mode: CompatibilityMode,
103 pub issues: Vec<String>,
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize)]
108pub struct SchemaRegistryStats {
109 pub total_schemas: usize,
110 pub total_subjects: usize,
111 pub validations_performed: u64,
112 pub validation_failures: u64,
113}
114
115#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct SchemaRegistryConfig {
118 pub default_compatibility: CompatibilityMode,
120
121 pub auto_register: bool,
123
124 pub enforce_validation: bool,
126}
127
128impl Default for SchemaRegistryConfig {
129 fn default() -> Self {
130 Self {
131 default_compatibility: CompatibilityMode::Backward,
132 auto_register: false,
133 enforce_validation: false,
134 }
135 }
136}
137
138pub struct SchemaRegistry {
140 schemas: Arc<DashMap<String, HashMap<u32, Schema>>>,
143
144 latest_versions: Arc<DashMap<String, u32>>,
146
147 compatibility_modes: Arc<DashMap<String, CompatibilityMode>>,
149
150 config: SchemaRegistryConfig,
152
153 stats: Arc<RwLock<SchemaRegistryStats>>,
155}
156
157impl SchemaRegistry {
158 pub fn new(config: SchemaRegistryConfig) -> Self {
159 Self {
160 schemas: Arc::new(DashMap::new()),
161 latest_versions: Arc::new(DashMap::new()),
162 compatibility_modes: Arc::new(DashMap::new()),
163 config,
164 stats: Arc::new(RwLock::new(SchemaRegistryStats {
165 total_schemas: 0,
166 total_subjects: 0,
167 validations_performed: 0,
168 validation_failures: 0,
169 })),
170 }
171 }
172
173 pub fn register_schema(
175 &self,
176 subject: String,
177 schema: JsonValue,
178 description: Option<String>,
179 tags: Option<Vec<String>>,
180 ) -> Result<RegisterSchemaResponse> {
181 let next_version = self
183 .latest_versions
184 .get(&subject)
185 .map(|v| *v + 1)
186 .unwrap_or(1);
187
188 if next_version > 1 {
190 let prev_version = next_version - 1;
191 if let Some(subject_schemas) = self.schemas.get(&subject)
192 && let Some(prev_schema) = subject_schemas.get(&prev_version)
193 {
194 let compatibility = self.get_compatibility_mode(&subject);
195 let check_result =
196 self.check_compatibility(&prev_schema.schema, &schema, compatibility)?;
197
198 if !check_result.compatible {
199 return Err(AllSourceError::ValidationError(format!(
200 "Schema compatibility check failed: {}",
201 check_result.issues.join(", ")
202 )));
203 }
204 }
205 }
206
207 let mut new_schema = Schema::new(subject.clone(), next_version, schema);
209 new_schema.description = description;
210 new_schema.tags = tags.unwrap_or_default();
211
212 let schema_id = new_schema.id;
213 let created_at = new_schema.created_at;
214
215 self.schemas
217 .entry(subject.clone())
218 .or_default()
219 .insert(next_version, new_schema);
220 self.latest_versions.insert(subject.clone(), next_version);
221
222 let mut stats = self.stats.write();
224 stats.total_schemas += 1;
225 if next_version == 1 {
226 stats.total_subjects += 1;
227 }
228
229 tracing::info!(
230 "📋 Registered schema v{} for subject '{}' (ID: {})",
231 next_version,
232 subject,
233 schema_id
234 );
235
236 Ok(RegisterSchemaResponse {
237 schema_id,
238 subject,
239 version: next_version,
240 created_at,
241 })
242 }
243
244 pub fn get_schema(&self, subject: &str, version: Option<u32>) -> Result<Schema> {
246 let subject_schemas = self.schemas.get(subject).ok_or_else(|| {
247 AllSourceError::ValidationError(format!("Subject not found: {subject}"))
248 })?;
249
250 let version = match version {
251 Some(v) => v,
252 None => *self.latest_versions.get(subject).ok_or_else(|| {
253 AllSourceError::ValidationError(format!("No versions for subject: {subject}"))
254 })?,
255 };
256
257 subject_schemas.get(&version).cloned().ok_or_else(|| {
258 AllSourceError::ValidationError(format!(
259 "Schema version {} not found for subject: {}",
260 version, subject
261 ))
262 })
263 }
264
265 pub fn list_versions(&self, subject: &str) -> Result<Vec<u32>> {
267 let subject_schemas = self.schemas.get(subject).ok_or_else(|| {
268 AllSourceError::ValidationError(format!("Subject not found: {subject}"))
269 })?;
270
271 let mut versions: Vec<u32> = subject_schemas.keys().copied().collect();
272 versions.sort_unstable();
273
274 Ok(versions)
275 }
276
277 pub fn list_subjects(&self) -> Vec<String> {
279 self.schemas
280 .iter()
281 .map(|entry| entry.key().clone())
282 .collect()
283 }
284
285 pub fn validate(
287 &self,
288 subject: &str,
289 version: Option<u32>,
290 payload: &JsonValue,
291 ) -> Result<ValidateEventResponse> {
292 let schema = self.get_schema(subject, version)?;
293
294 let validation_result = Self::validate_json(payload, &schema.schema);
295
296 let mut stats = self.stats.write();
298 stats.validations_performed += 1;
299 if !validation_result.is_empty() {
300 stats.validation_failures += 1;
301 }
302
303 Ok(ValidateEventResponse {
304 valid: validation_result.is_empty(),
305 errors: validation_result,
306 schema_version: schema.version,
307 })
308 }
309
310 fn validate_json(data: &JsonValue, schema: &JsonValue) -> Vec<String> {
312 let mut errors = Vec::new();
313
314 if let Some(required) = schema.get("required").and_then(|r| r.as_array())
319 && let Some(obj) = data.as_object()
320 {
321 for req_field in required {
322 if let Some(field_name) = req_field.as_str()
323 && !obj.contains_key(field_name)
324 {
325 errors.push(format!("Missing required field: {field_name}"));
326 }
327 }
328 }
329
330 if let Some(expected_type) = schema.get("type").and_then(|t| t.as_str()) {
332 let actual_type = match data {
333 JsonValue::Null => "null",
334 JsonValue::Bool(_) => "boolean",
335 JsonValue::Number(_) => "number",
336 JsonValue::String(_) => "string",
337 JsonValue::Array(_) => "array",
338 JsonValue::Object(_) => "object",
339 };
340
341 if expected_type != actual_type {
342 errors.push(format!(
343 "Type mismatch: expected {}, got {}",
344 expected_type, actual_type
345 ));
346 }
347 }
348
349 if let (Some(properties), Some(data_obj)) = (
351 schema.get("properties").and_then(|p| p.as_object()),
352 data.as_object(),
353 ) {
354 for (key, value) in data_obj {
355 if let Some(prop_schema) = properties.get(key) {
356 let nested_errors = Self::validate_json(value, prop_schema);
357 for err in nested_errors {
358 errors.push(format!("{key}.{err}"));
359 }
360 }
361 }
362 }
363
364 errors
365 }
366
367 fn check_compatibility(
369 &self,
370 old_schema: &JsonValue,
371 new_schema: &JsonValue,
372 mode: CompatibilityMode,
373 ) -> Result<CompatibilityCheckResult> {
374 let mut issues = Vec::new();
375
376 match mode {
377 CompatibilityMode::None => {
378 return Ok(CompatibilityCheckResult {
379 compatible: true,
380 compatibility_mode: mode,
381 issues: Vec::new(),
382 });
383 }
384 CompatibilityMode::Backward => {
385 issues.extend(self.check_backward_compatibility(old_schema, new_schema));
387 }
388 CompatibilityMode::Forward => {
389 issues.extend(self.check_forward_compatibility(old_schema, new_schema));
391 }
392 CompatibilityMode::Full => {
393 issues.extend(self.check_backward_compatibility(old_schema, new_schema));
395 issues.extend(self.check_forward_compatibility(old_schema, new_schema));
396 }
397 }
398
399 Ok(CompatibilityCheckResult {
400 compatible: issues.is_empty(),
401 compatibility_mode: mode,
402 issues,
403 })
404 }
405
406 fn check_backward_compatibility(
407 &self,
408 old_schema: &JsonValue,
409 new_schema: &JsonValue,
410 ) -> Vec<String> {
411 let mut issues = Vec::new();
412
413 if let Some(old_required) = old_schema.get("required").and_then(|r| r.as_array()) {
415 let new_required = new_schema
416 .get("required")
417 .and_then(|r| r.as_array())
418 .map(|arr| arr.iter().filter_map(|v| v.as_str()).collect::<Vec<_>>())
419 .unwrap_or_default();
420
421 for old_req in old_required {
422 if let Some(field_name) = old_req.as_str()
423 && !new_required.contains(&field_name)
424 {
425 issues.push(format!(
426 "Backward compatibility: required field '{}' removed",
427 field_name
428 ));
429 }
430 }
431 }
432
433 issues
434 }
435
436 fn check_forward_compatibility(
437 &self,
438 old_schema: &JsonValue,
439 new_schema: &JsonValue,
440 ) -> Vec<String> {
441 let mut issues = Vec::new();
442
443 if let Some(new_required) = new_schema.get("required").and_then(|r| r.as_array()) {
445 let old_required = old_schema
446 .get("required")
447 .and_then(|r| r.as_array())
448 .map(|arr| arr.iter().filter_map(|v| v.as_str()).collect::<Vec<_>>())
449 .unwrap_or_default();
450
451 for new_req in new_required {
452 if let Some(field_name) = new_req.as_str()
453 && !old_required.contains(&field_name)
454 {
455 issues.push(format!(
456 "Forward compatibility: new required field '{}' added",
457 field_name
458 ));
459 }
460 }
461 }
462
463 issues
464 }
465
466 pub fn set_compatibility_mode(&self, subject: String, mode: CompatibilityMode) {
468 self.compatibility_modes.insert(subject, mode);
469 }
470
471 pub fn get_compatibility_mode(&self, subject: &str) -> CompatibilityMode {
473 self.compatibility_modes
474 .get(subject)
475 .map(|entry| *entry.value())
476 .unwrap_or(self.config.default_compatibility)
477 }
478
479 pub fn delete_schema(&self, subject: &str, version: u32) -> Result<bool> {
481 if let Some(mut subject_schemas) = self.schemas.get_mut(subject)
482 && subject_schemas.remove(&version).is_some()
483 {
484 tracing::info!("🗑️ Deleted schema v{} for subject '{}'", version, subject);
485
486 let mut stats = self.stats.write();
488 stats.total_schemas = stats.total_schemas.saturating_sub(1);
489
490 return Ok(true);
491 }
492
493 Ok(false)
494 }
495
496 pub fn stats(&self) -> SchemaRegistryStats {
498 self.stats.read().clone()
499 }
500
501 pub fn config(&self) -> &SchemaRegistryConfig {
503 &self.config
504 }
505}
506
507#[cfg(test)]
508mod tests {
509 use super::*;
510 use serde_json::json;
511
512 #[test]
513 fn test_schema_registration() {
514 let registry = SchemaRegistry::new(SchemaRegistryConfig::default());
515
516 let schema = json!({
517 "type": "object",
518 "properties": {
519 "user_id": {"type": "string"},
520 "email": {"type": "string"}
521 },
522 "required": ["user_id", "email"]
523 });
524
525 let response = registry
526 .register_schema(
527 "user.created".to_string(),
528 schema,
529 Some("User creation event".to_string()),
530 None,
531 )
532 .unwrap();
533
534 assert_eq!(response.version, 1);
535 assert_eq!(response.subject, "user.created");
536 }
537
538 #[test]
539 fn test_schema_validation() {
540 let registry = SchemaRegistry::new(SchemaRegistryConfig::default());
541
542 let schema = json!({
543 "type": "object",
544 "properties": {
545 "user_id": {"type": "string"},
546 "email": {"type": "string"}
547 },
548 "required": ["user_id", "email"]
549 });
550
551 registry
552 .register_schema("user.created".to_string(), schema, None, None)
553 .unwrap();
554
555 let valid_payload = json!({
557 "user_id": "123",
558 "email": "test@example.com"
559 });
560
561 let result = registry
562 .validate("user.created", None, &valid_payload)
563 .unwrap();
564 assert!(result.valid);
565
566 let invalid_payload = json!({
568 "user_id": "123"
569 });
570
571 let result = registry
572 .validate("user.created", None, &invalid_payload)
573 .unwrap();
574 assert!(!result.valid);
575 assert!(!result.errors.is_empty());
576 }
577
578 #[test]
579 fn test_backward_compatibility() {
580 let registry = SchemaRegistry::new(SchemaRegistryConfig {
581 default_compatibility: CompatibilityMode::Backward,
582 ..Default::default()
583 });
584
585 let schema_v1 = json!({
586 "type": "object",
587 "required": ["user_id", "email"]
588 });
589
590 registry
591 .register_schema("user.created".to_string(), schema_v1, None, None)
592 .unwrap();
593
594 let schema_v2 = json!({
596 "type": "object",
597 "required": ["user_id", "email"]
598 });
599
600 let result = registry.register_schema("user.created".to_string(), schema_v2, None, None);
601 assert!(result.is_ok());
602
603 let schema_v3 = json!({
605 "type": "object",
606 "required": ["user_id"]
607 });
608
609 let result = registry.register_schema("user.created".to_string(), schema_v3, None, None);
610 assert!(result.is_err());
611 }
612}