1use crate::error::{AllSourceError, Result};
2use chrono::{DateTime, Utc};
3use dashmap::DashMap;
4use parking_lot::RwLock;
5use serde::{Deserialize, Serialize};
6use serde_json::Value as JsonValue;
7use std::collections::HashMap;
8use std::sync::Arc;
9use uuid::Uuid;
10
11#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
13#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
14pub enum CompatibilityMode {
15 None,
17 Backward,
19 Forward,
21 Full,
23}
24
25impl Default for CompatibilityMode {
26 fn default() -> Self {
27 Self::Backward
28 }
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct Schema {
34 pub id: Uuid,
36
37 pub subject: String,
39
40 pub version: u32,
42
43 pub schema: JsonValue,
45
46 pub created_at: DateTime<Utc>,
48
49 pub description: Option<String>,
51
52 pub tags: Vec<String>,
54}
55
56impl Schema {
57 pub fn new(subject: String, version: u32, schema: JsonValue) -> Self {
58 Self {
59 id: Uuid::new_v4(),
60 subject,
61 version,
62 schema,
63 created_at: Utc::now(),
64 description: None,
65 tags: Vec::new(),
66 }
67 }
68}
69
70#[derive(Debug, Clone, Serialize, Deserialize)]
72pub struct RegisterSchemaRequest {
73 pub subject: String,
74 pub schema: JsonValue,
75 pub description: Option<String>,
76 pub tags: Option<Vec<String>>,
77}
78
79#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct RegisterSchemaResponse {
82 pub schema_id: Uuid,
83 pub subject: String,
84 pub version: u32,
85 pub created_at: DateTime<Utc>,
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct ValidateEventRequest {
91 pub subject: String,
92 pub version: Option<u32>,
93 pub payload: JsonValue,
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize)]
98pub struct ValidateEventResponse {
99 pub valid: bool,
100 pub errors: Vec<String>,
101 pub schema_version: u32,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct CompatibilityCheckResult {
107 pub compatible: bool,
108 pub compatibility_mode: CompatibilityMode,
109 pub issues: Vec<String>,
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct SchemaRegistryStats {
115 pub total_schemas: usize,
116 pub total_subjects: usize,
117 pub validations_performed: u64,
118 pub validation_failures: u64,
119}
120
121#[derive(Debug, Clone, Serialize, Deserialize)]
123pub struct SchemaRegistryConfig {
124 pub default_compatibility: CompatibilityMode,
126
127 pub auto_register: bool,
129
130 pub enforce_validation: bool,
132}
133
134impl Default for SchemaRegistryConfig {
135 fn default() -> Self {
136 Self {
137 default_compatibility: CompatibilityMode::Backward,
138 auto_register: false,
139 enforce_validation: false,
140 }
141 }
142}
143
144pub struct SchemaRegistry {
146 schemas: Arc<DashMap<String, HashMap<u32, Schema>>>,
149
150 latest_versions: Arc<DashMap<String, u32>>,
152
153 compatibility_modes: Arc<DashMap<String, CompatibilityMode>>,
155
156 config: SchemaRegistryConfig,
158
159 stats: Arc<RwLock<SchemaRegistryStats>>,
161}
162
163impl SchemaRegistry {
164 pub fn new(config: SchemaRegistryConfig) -> Self {
165 Self {
166 schemas: Arc::new(DashMap::new()),
167 latest_versions: Arc::new(DashMap::new()),
168 compatibility_modes: Arc::new(DashMap::new()),
169 config,
170 stats: Arc::new(RwLock::new(SchemaRegistryStats {
171 total_schemas: 0,
172 total_subjects: 0,
173 validations_performed: 0,
174 validation_failures: 0,
175 })),
176 }
177 }
178
179 pub fn register_schema(
181 &self,
182 subject: String,
183 schema: JsonValue,
184 description: Option<String>,
185 tags: Option<Vec<String>>,
186 ) -> Result<RegisterSchemaResponse> {
187 let next_version = self.latest_versions.get(&subject).map(|v| *v + 1).unwrap_or(1);
189
190 if next_version > 1 {
192 let prev_version = next_version - 1;
193 if let Some(subject_schemas) = self.schemas.get(&subject) {
194 if let Some(prev_schema) = subject_schemas.get(&prev_version) {
195 let compatibility = self.get_compatibility_mode(&subject);
196 let check_result =
197 self.check_compatibility(&prev_schema.schema, &schema, compatibility)?;
198
199 if !check_result.compatible {
200 return Err(AllSourceError::ValidationError(format!(
201 "Schema compatibility check failed: {}",
202 check_result.issues.join(", ")
203 )));
204 }
205 }
206 }
207 }
208
209 let mut new_schema = Schema::new(subject.clone(), next_version, schema);
211 new_schema.description = description;
212 new_schema.tags = tags.unwrap_or_default();
213
214 let schema_id = new_schema.id;
215 let created_at = new_schema.created_at;
216
217 self.schemas.entry(subject.clone()).or_default().insert(next_version, new_schema);
219 self.latest_versions.insert(subject.clone(), next_version);
220
221 let mut stats = self.stats.write();
223 stats.total_schemas += 1;
224 if next_version == 1 {
225 stats.total_subjects += 1;
226 }
227
228 tracing::info!(
229 "📋 Registered schema v{} for subject '{}' (ID: {})",
230 next_version,
231 subject,
232 schema_id
233 );
234
235 Ok(RegisterSchemaResponse {
236 schema_id,
237 subject,
238 version: next_version,
239 created_at,
240 })
241 }
242
243 pub fn get_schema(&self, subject: &str, version: Option<u32>) -> Result<Schema> {
245 let subject_schemas = self.schemas.get(subject).ok_or_else(|| {
246 AllSourceError::ValidationError(format!("Subject not found: {subject}"))
247 })?;
248
249 let version = match version {
250 Some(v) => v,
251 None => {
252 *self.latest_versions.get(subject).ok_or_else(|| {
253 AllSourceError::ValidationError(format!("No versions for subject: {subject}"))
254 })?
255 }
256 };
257
258 subject_schemas.get(&version).cloned().ok_or_else(|| {
259 AllSourceError::ValidationError(format!(
260 "Schema version {} not found for subject: {}",
261 version, subject
262 ))
263 })
264 }
265
266 pub fn list_versions(&self, subject: &str) -> Result<Vec<u32>> {
268 let subject_schemas = self.schemas.get(subject).ok_or_else(|| {
269 AllSourceError::ValidationError(format!("Subject not found: {subject}"))
270 })?;
271
272 let mut versions: Vec<u32> = subject_schemas.keys().copied().collect();
273 versions.sort_unstable();
274
275 Ok(versions)
276 }
277
278 pub fn list_subjects(&self) -> Vec<String> {
280 self.schemas.iter().map(|entry| entry.key().clone()).collect()
281 }
282
283 pub fn validate(
285 &self,
286 subject: &str,
287 version: Option<u32>,
288 payload: &JsonValue,
289 ) -> Result<ValidateEventResponse> {
290 let schema = self.get_schema(subject, version)?;
291
292 let validation_result = self.validate_json(payload, &schema.schema);
293
294 let mut stats = self.stats.write();
296 stats.validations_performed += 1;
297 if !validation_result.is_empty() {
298 stats.validation_failures += 1;
299 }
300
301 Ok(ValidateEventResponse {
302 valid: validation_result.is_empty(),
303 errors: validation_result,
304 schema_version: schema.version,
305 })
306 }
307
308 fn validate_json(&self, data: &JsonValue, schema: &JsonValue) -> Vec<String> {
310 let mut errors = Vec::new();
311
312 if let Some(required) = schema.get("required").and_then(|r| r.as_array()) {
317 if let Some(obj) = data.as_object() {
318 for req_field in required {
319 if let Some(field_name) = req_field.as_str() {
320 if !obj.contains_key(field_name) {
321 errors.push(format!("Missing required field: {field_name}"));
322 }
323 }
324 }
325 }
326 }
327
328 if let Some(expected_type) = schema.get("type").and_then(|t| t.as_str()) {
330 let actual_type = match data {
331 JsonValue::Null => "null",
332 JsonValue::Bool(_) => "boolean",
333 JsonValue::Number(_) => "number",
334 JsonValue::String(_) => "string",
335 JsonValue::Array(_) => "array",
336 JsonValue::Object(_) => "object",
337 };
338
339 if expected_type != actual_type {
340 errors.push(format!(
341 "Type mismatch: expected {}, got {}",
342 expected_type, actual_type
343 ));
344 }
345 }
346
347 if let (Some(properties), Some(data_obj)) = (
349 schema.get("properties").and_then(|p| p.as_object()),
350 data.as_object(),
351 ) {
352 for (key, value) in data_obj {
353 if let Some(prop_schema) = properties.get(key) {
354 let nested_errors = self.validate_json(value, prop_schema);
355 for err in nested_errors {
356 errors.push(format!("{key}.{err}"));
357 }
358 }
359 }
360 }
361
362 errors
363 }
364
365 fn check_compatibility(
367 &self,
368 old_schema: &JsonValue,
369 new_schema: &JsonValue,
370 mode: CompatibilityMode,
371 ) -> Result<CompatibilityCheckResult> {
372 let mut issues = Vec::new();
373
374 match mode {
375 CompatibilityMode::None => {
376 return Ok(CompatibilityCheckResult {
377 compatible: true,
378 compatibility_mode: mode,
379 issues: Vec::new(),
380 });
381 }
382 CompatibilityMode::Backward => {
383 issues.extend(self.check_backward_compatibility(old_schema, new_schema));
385 }
386 CompatibilityMode::Forward => {
387 issues.extend(self.check_forward_compatibility(old_schema, new_schema));
389 }
390 CompatibilityMode::Full => {
391 issues.extend(self.check_backward_compatibility(old_schema, new_schema));
393 issues.extend(self.check_forward_compatibility(old_schema, new_schema));
394 }
395 }
396
397 Ok(CompatibilityCheckResult {
398 compatible: issues.is_empty(),
399 compatibility_mode: mode,
400 issues,
401 })
402 }
403
404 fn check_backward_compatibility(
405 &self,
406 old_schema: &JsonValue,
407 new_schema: &JsonValue,
408 ) -> Vec<String> {
409 let mut issues = Vec::new();
410
411 if let Some(old_required) = old_schema.get("required").and_then(|r| r.as_array()) {
413 let new_required = new_schema
414 .get("required")
415 .and_then(|r| r.as_array())
416 .map(|arr| arr.iter().filter_map(|v| v.as_str()).collect::<Vec<_>>())
417 .unwrap_or_default();
418
419 for old_req in old_required {
420 if let Some(field_name) = old_req.as_str() {
421 if !new_required.contains(&field_name) {
422 issues.push(format!(
423 "Backward compatibility: required field '{}' removed",
424 field_name
425 ));
426 }
427 }
428 }
429 }
430
431 issues
432 }
433
434 fn check_forward_compatibility(
435 &self,
436 old_schema: &JsonValue,
437 new_schema: &JsonValue,
438 ) -> Vec<String> {
439 let mut issues = Vec::new();
440
441 if let Some(new_required) = new_schema.get("required").and_then(|r| r.as_array()) {
443 let old_required = old_schema
444 .get("required")
445 .and_then(|r| r.as_array())
446 .map(|arr| arr.iter().filter_map(|v| v.as_str()).collect::<Vec<_>>())
447 .unwrap_or_default();
448
449 for new_req in new_required {
450 if let Some(field_name) = new_req.as_str() {
451 if !old_required.contains(&field_name) {
452 issues.push(format!(
453 "Forward compatibility: new required field '{}' added",
454 field_name
455 ));
456 }
457 }
458 }
459 }
460
461 issues
462 }
463
464 pub fn set_compatibility_mode(&self, subject: String, mode: CompatibilityMode) {
466 self.compatibility_modes.insert(subject, mode);
467 }
468
469 pub fn get_compatibility_mode(&self, subject: &str) -> CompatibilityMode {
471 self.compatibility_modes
472 .get(subject)
473 .map(|entry| *entry.value())
474 .unwrap_or(self.config.default_compatibility)
475 }
476
477 pub fn delete_schema(&self, subject: &str, version: u32) -> Result<bool> {
479 if let Some(mut subject_schemas) = self.schemas.get_mut(subject) {
480 if subject_schemas.remove(&version).is_some() {
481 tracing::info!("🗑️ Deleted schema v{} for subject '{}'", version, subject);
482
483 let mut stats = self.stats.write();
485 stats.total_schemas = stats.total_schemas.saturating_sub(1);
486
487 return Ok(true);
488 }
489 }
490
491 Ok(false)
492 }
493
494 pub fn stats(&self) -> SchemaRegistryStats {
496 self.stats.read().clone()
497 }
498
499 pub fn config(&self) -> &SchemaRegistryConfig {
501 &self.config
502 }
503}
504
505#[cfg(test)]
506mod tests {
507 use super::*;
508 use serde_json::json;
509
510 #[test]
511 fn test_schema_registration() {
512 let registry = SchemaRegistry::new(SchemaRegistryConfig::default());
513
514 let schema = json!({
515 "type": "object",
516 "properties": {
517 "user_id": {"type": "string"},
518 "email": {"type": "string"}
519 },
520 "required": ["user_id", "email"]
521 });
522
523 let response = registry
524 .register_schema(
525 "user.created".to_string(),
526 schema,
527 Some("User creation event".to_string()),
528 None,
529 )
530 .unwrap();
531
532 assert_eq!(response.version, 1);
533 assert_eq!(response.subject, "user.created");
534 }
535
536 #[test]
537 fn test_schema_validation() {
538 let registry = SchemaRegistry::new(SchemaRegistryConfig::default());
539
540 let schema = json!({
541 "type": "object",
542 "properties": {
543 "user_id": {"type": "string"},
544 "email": {"type": "string"}
545 },
546 "required": ["user_id", "email"]
547 });
548
549 registry
550 .register_schema("user.created".to_string(), schema, None, None)
551 .unwrap();
552
553 let valid_payload = json!({
555 "user_id": "123",
556 "email": "test@example.com"
557 });
558
559 let result = registry
560 .validate("user.created", None, &valid_payload)
561 .unwrap();
562 assert!(result.valid);
563
564 let invalid_payload = json!({
566 "user_id": "123"
567 });
568
569 let result = registry
570 .validate("user.created", None, &invalid_payload)
571 .unwrap();
572 assert!(!result.valid);
573 assert!(!result.errors.is_empty());
574 }
575
576 #[test]
577 fn test_backward_compatibility() {
578 let registry = SchemaRegistry::new(SchemaRegistryConfig {
579 default_compatibility: CompatibilityMode::Backward,
580 ..Default::default()
581 });
582
583 let schema_v1 = json!({
584 "type": "object",
585 "required": ["user_id", "email"]
586 });
587
588 registry
589 .register_schema("user.created".to_string(), schema_v1, None, None)
590 .unwrap();
591
592 let schema_v2 = json!({
594 "type": "object",
595 "required": ["user_id", "email"]
596 });
597
598 let result = registry.register_schema("user.created".to_string(), schema_v2, None, None);
599 assert!(result.is_ok());
600
601 let schema_v3 = json!({
603 "type": "object",
604 "required": ["user_id"]
605 });
606
607 let result = registry.register_schema("user.created".to_string(), schema_v3, None, None);
608 assert!(result.is_err());
609 }
610}