1use crate::error::{AllSourceError, Result};
2use chrono::{DateTime, Utc};
3use parking_lot::RwLock;
4use serde::{Deserialize, Serialize};
5use serde_json::Value as JsonValue;
6use std::collections::HashMap;
7use std::sync::Arc;
8use uuid::Uuid;
9
10#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
12#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
13pub enum CompatibilityMode {
14 None,
16 Backward,
18 Forward,
20 Full,
22}
23
24impl Default for CompatibilityMode {
25 fn default() -> Self {
26 Self::Backward
27 }
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct Schema {
33 pub id: Uuid,
35
36 pub subject: String,
38
39 pub version: u32,
41
42 pub schema: JsonValue,
44
45 pub created_at: DateTime<Utc>,
47
48 pub description: Option<String>,
50
51 pub tags: Vec<String>,
53}
54
55impl Schema {
56 pub fn new(subject: String, version: u32, schema: JsonValue) -> Self {
57 Self {
58 id: Uuid::new_v4(),
59 subject,
60 version,
61 schema,
62 created_at: Utc::now(),
63 description: None,
64 tags: Vec::new(),
65 }
66 }
67}
68
69#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct RegisterSchemaRequest {
72 pub subject: String,
73 pub schema: JsonValue,
74 pub description: Option<String>,
75 pub tags: Option<Vec<String>>,
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
80pub struct RegisterSchemaResponse {
81 pub schema_id: Uuid,
82 pub subject: String,
83 pub version: u32,
84 pub created_at: DateTime<Utc>,
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct ValidateEventRequest {
90 pub subject: String,
91 pub version: Option<u32>,
92 pub payload: JsonValue,
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize)]
97pub struct ValidateEventResponse {
98 pub valid: bool,
99 pub errors: Vec<String>,
100 pub schema_version: u32,
101}
102
103#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct CompatibilityCheckResult {
106 pub compatible: bool,
107 pub compatibility_mode: CompatibilityMode,
108 pub issues: Vec<String>,
109}
110
111#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct SchemaRegistryStats {
114 pub total_schemas: usize,
115 pub total_subjects: usize,
116 pub validations_performed: u64,
117 pub validation_failures: u64,
118}
119
120#[derive(Debug, Clone, Serialize, Deserialize)]
122pub struct SchemaRegistryConfig {
123 pub default_compatibility: CompatibilityMode,
125
126 pub auto_register: bool,
128
129 pub enforce_validation: bool,
131}
132
133impl Default for SchemaRegistryConfig {
134 fn default() -> Self {
135 Self {
136 default_compatibility: CompatibilityMode::Backward,
137 auto_register: false,
138 enforce_validation: false,
139 }
140 }
141}
142
143pub struct SchemaRegistry {
145 schemas: Arc<RwLock<HashMap<String, HashMap<u32, Schema>>>>,
148
149 latest_versions: Arc<RwLock<HashMap<String, u32>>>,
151
152 compatibility_modes: Arc<RwLock<HashMap<String, CompatibilityMode>>>,
154
155 config: SchemaRegistryConfig,
157
158 stats: Arc<RwLock<SchemaRegistryStats>>,
160}
161
162impl SchemaRegistry {
163 pub fn new(config: SchemaRegistryConfig) -> Self {
164 Self {
165 schemas: Arc::new(RwLock::new(HashMap::new())),
166 latest_versions: Arc::new(RwLock::new(HashMap::new())),
167 compatibility_modes: Arc::new(RwLock::new(HashMap::new())),
168 config,
169 stats: Arc::new(RwLock::new(SchemaRegistryStats {
170 total_schemas: 0,
171 total_subjects: 0,
172 validations_performed: 0,
173 validation_failures: 0,
174 })),
175 }
176 }
177
178 pub fn register_schema(
180 &self,
181 subject: String,
182 schema: JsonValue,
183 description: Option<String>,
184 tags: Option<Vec<String>>,
185 ) -> Result<RegisterSchemaResponse> {
186 let mut schemas = self.schemas.write();
187 let mut latest_versions = self.latest_versions.write();
188
189 let subject_schemas = schemas.entry(subject.clone()).or_insert_with(HashMap::new);
191
192 let next_version = latest_versions.get(&subject).map(|v| v + 1).unwrap_or(1);
194
195 if next_version > 1 {
197 let prev_version = next_version - 1;
198 if let Some(prev_schema) = subject_schemas.get(&prev_version) {
199 let compatibility = self.get_compatibility_mode(&subject);
200 let check_result = self.check_compatibility(
201 &prev_schema.schema,
202 &schema,
203 compatibility,
204 )?;
205
206 if !check_result.compatible {
207 return Err(AllSourceError::ValidationError(format!(
208 "Schema compatibility check failed: {}",
209 check_result.issues.join(", ")
210 )));
211 }
212 }
213 }
214
215 let mut new_schema = Schema::new(subject.clone(), next_version, schema);
217 new_schema.description = description;
218 new_schema.tags = tags.unwrap_or_default();
219
220 let schema_id = new_schema.id;
221 let created_at = new_schema.created_at;
222
223 subject_schemas.insert(next_version, new_schema);
224 latest_versions.insert(subject.clone(), next_version);
225
226 let mut stats = self.stats.write();
228 stats.total_schemas += 1;
229 if next_version == 1 {
230 stats.total_subjects += 1;
231 }
232
233 tracing::info!(
234 "📋 Registered schema v{} for subject '{}' (ID: {})",
235 next_version,
236 subject,
237 schema_id
238 );
239
240 Ok(RegisterSchemaResponse {
241 schema_id,
242 subject,
243 version: next_version,
244 created_at,
245 })
246 }
247
248 pub fn get_schema(&self, subject: &str, version: Option<u32>) -> Result<Schema> {
250 let schemas = self.schemas.read();
251
252 let subject_schemas = schemas
253 .get(subject)
254 .ok_or_else(|| AllSourceError::ValidationError(format!("Subject not found: {}", subject)))?;
255
256 let version = match version {
257 Some(v) => v,
258 None => {
259 let latest_versions = self.latest_versions.read();
260 *latest_versions.get(subject).ok_or_else(|| {
261 AllSourceError::ValidationError(format!("No versions for subject: {}", subject))
262 })?
263 }
264 };
265
266 subject_schemas
267 .get(&version)
268 .cloned()
269 .ok_or_else(|| {
270 AllSourceError::ValidationError(format!(
271 "Schema version {} not found for subject: {}",
272 version, subject
273 ))
274 })
275 }
276
277 pub fn list_versions(&self, subject: &str) -> Result<Vec<u32>> {
279 let schemas = self.schemas.read();
280
281 let subject_schemas = schemas
282 .get(subject)
283 .ok_or_else(|| AllSourceError::ValidationError(format!("Subject not found: {}", subject)))?;
284
285 let mut versions: Vec<u32> = subject_schemas.keys().copied().collect();
286 versions.sort_unstable();
287
288 Ok(versions)
289 }
290
291 pub fn list_subjects(&self) -> Vec<String> {
293 let schemas = self.schemas.read();
294 schemas.keys().cloned().collect()
295 }
296
297 pub fn validate(
299 &self,
300 subject: &str,
301 version: Option<u32>,
302 payload: &JsonValue,
303 ) -> Result<ValidateEventResponse> {
304 let schema = self.get_schema(subject, version)?;
305
306 let validation_result = self.validate_json(payload, &schema.schema);
307
308 let mut stats = self.stats.write();
310 stats.validations_performed += 1;
311 if !validation_result.is_empty() {
312 stats.validation_failures += 1;
313 }
314
315 Ok(ValidateEventResponse {
316 valid: validation_result.is_empty(),
317 errors: validation_result,
318 schema_version: schema.version,
319 })
320 }
321
322 fn validate_json(&self, data: &JsonValue, schema: &JsonValue) -> Vec<String> {
324 let mut errors = Vec::new();
325
326 if let Some(required) = schema.get("required").and_then(|r| r.as_array()) {
331 if let Some(obj) = data.as_object() {
332 for req_field in required {
333 if let Some(field_name) = req_field.as_str() {
334 if !obj.contains_key(field_name) {
335 errors.push(format!("Missing required field: {}", field_name));
336 }
337 }
338 }
339 }
340 }
341
342 if let Some(expected_type) = schema.get("type").and_then(|t| t.as_str()) {
344 let actual_type = match data {
345 JsonValue::Null => "null",
346 JsonValue::Bool(_) => "boolean",
347 JsonValue::Number(_) => "number",
348 JsonValue::String(_) => "string",
349 JsonValue::Array(_) => "array",
350 JsonValue::Object(_) => "object",
351 };
352
353 if expected_type != actual_type {
354 errors.push(format!(
355 "Type mismatch: expected {}, got {}",
356 expected_type, actual_type
357 ));
358 }
359 }
360
361 if let (Some(properties), Some(data_obj)) = (
363 schema.get("properties").and_then(|p| p.as_object()),
364 data.as_object(),
365 ) {
366 for (key, value) in data_obj {
367 if let Some(prop_schema) = properties.get(key) {
368 let nested_errors = self.validate_json(value, prop_schema);
369 for err in nested_errors {
370 errors.push(format!("{}.{}", key, err));
371 }
372 }
373 }
374 }
375
376 errors
377 }
378
379 fn check_compatibility(
381 &self,
382 old_schema: &JsonValue,
383 new_schema: &JsonValue,
384 mode: CompatibilityMode,
385 ) -> Result<CompatibilityCheckResult> {
386 let mut issues = Vec::new();
387
388 match mode {
389 CompatibilityMode::None => {
390 return Ok(CompatibilityCheckResult {
391 compatible: true,
392 compatibility_mode: mode,
393 issues: Vec::new(),
394 });
395 }
396 CompatibilityMode::Backward => {
397 issues.extend(self.check_backward_compatibility(old_schema, new_schema));
399 }
400 CompatibilityMode::Forward => {
401 issues.extend(self.check_forward_compatibility(old_schema, new_schema));
403 }
404 CompatibilityMode::Full => {
405 issues.extend(self.check_backward_compatibility(old_schema, new_schema));
407 issues.extend(self.check_forward_compatibility(old_schema, new_schema));
408 }
409 }
410
411 Ok(CompatibilityCheckResult {
412 compatible: issues.is_empty(),
413 compatibility_mode: mode,
414 issues,
415 })
416 }
417
418 fn check_backward_compatibility(
419 &self,
420 old_schema: &JsonValue,
421 new_schema: &JsonValue,
422 ) -> Vec<String> {
423 let mut issues = Vec::new();
424
425 if let Some(old_required) = old_schema.get("required").and_then(|r| r.as_array()) {
427 let new_required = new_schema
428 .get("required")
429 .and_then(|r| r.as_array())
430 .map(|arr| {
431 arr.iter()
432 .filter_map(|v| v.as_str())
433 .collect::<Vec<_>>()
434 })
435 .unwrap_or_default();
436
437 for old_req in old_required {
438 if let Some(field_name) = old_req.as_str() {
439 if !new_required.contains(&field_name) {
440 issues.push(format!(
441 "Backward compatibility: required field '{}' removed",
442 field_name
443 ));
444 }
445 }
446 }
447 }
448
449 issues
450 }
451
452 fn check_forward_compatibility(
453 &self,
454 old_schema: &JsonValue,
455 new_schema: &JsonValue,
456 ) -> Vec<String> {
457 let mut issues = Vec::new();
458
459 if let Some(new_required) = new_schema.get("required").and_then(|r| r.as_array()) {
461 let old_required = old_schema
462 .get("required")
463 .and_then(|r| r.as_array())
464 .map(|arr| {
465 arr.iter()
466 .filter_map(|v| v.as_str())
467 .collect::<Vec<_>>()
468 })
469 .unwrap_or_default();
470
471 for new_req in new_required {
472 if let Some(field_name) = new_req.as_str() {
473 if !old_required.contains(&field_name) {
474 issues.push(format!(
475 "Forward compatibility: new required field '{}' added",
476 field_name
477 ));
478 }
479 }
480 }
481 }
482
483 issues
484 }
485
486 pub fn set_compatibility_mode(&self, subject: String, mode: CompatibilityMode) {
488 let mut modes = self.compatibility_modes.write();
489 modes.insert(subject, mode);
490 }
491
492 pub fn get_compatibility_mode(&self, subject: &str) -> CompatibilityMode {
494 let modes = self.compatibility_modes.read();
495 modes.get(subject).copied().unwrap_or(self.config.default_compatibility)
496 }
497
498 pub fn delete_schema(&self, subject: &str, version: u32) -> Result<bool> {
500 let mut schemas = self.schemas.write();
501
502 if let Some(subject_schemas) = schemas.get_mut(subject) {
503 if subject_schemas.remove(&version).is_some() {
504 tracing::info!("🗑️ Deleted schema v{} for subject '{}'", version, subject);
505
506 let mut stats = self.stats.write();
508 stats.total_schemas = stats.total_schemas.saturating_sub(1);
509
510 return Ok(true);
511 }
512 }
513
514 Ok(false)
515 }
516
517 pub fn stats(&self) -> SchemaRegistryStats {
519 self.stats.read().clone()
520 }
521
522 pub fn config(&self) -> &SchemaRegistryConfig {
524 &self.config
525 }
526}
527
528#[cfg(test)]
529mod tests {
530 use super::*;
531 use serde_json::json;
532
533 #[test]
534 fn test_schema_registration() {
535 let registry = SchemaRegistry::new(SchemaRegistryConfig::default());
536
537 let schema = json!({
538 "type": "object",
539 "properties": {
540 "user_id": {"type": "string"},
541 "email": {"type": "string"}
542 },
543 "required": ["user_id", "email"]
544 });
545
546 let response = registry
547 .register_schema(
548 "user.created".to_string(),
549 schema,
550 Some("User creation event".to_string()),
551 None,
552 )
553 .unwrap();
554
555 assert_eq!(response.version, 1);
556 assert_eq!(response.subject, "user.created");
557 }
558
559 #[test]
560 fn test_schema_validation() {
561 let registry = SchemaRegistry::new(SchemaRegistryConfig::default());
562
563 let schema = json!({
564 "type": "object",
565 "properties": {
566 "user_id": {"type": "string"},
567 "email": {"type": "string"}
568 },
569 "required": ["user_id", "email"]
570 });
571
572 registry
573 .register_schema("user.created".to_string(), schema, None, None)
574 .unwrap();
575
576 let valid_payload = json!({
578 "user_id": "123",
579 "email": "test@example.com"
580 });
581
582 let result = registry.validate("user.created", None, &valid_payload).unwrap();
583 assert!(result.valid);
584
585 let invalid_payload = json!({
587 "user_id": "123"
588 });
589
590 let result = registry.validate("user.created", None, &invalid_payload).unwrap();
591 assert!(!result.valid);
592 assert!(!result.errors.is_empty());
593 }
594
595 #[test]
596 fn test_backward_compatibility() {
597 let registry = SchemaRegistry::new(SchemaRegistryConfig {
598 default_compatibility: CompatibilityMode::Backward,
599 ..Default::default()
600 });
601
602 let schema_v1 = json!({
603 "type": "object",
604 "required": ["user_id", "email"]
605 });
606
607 registry
608 .register_schema("user.created".to_string(), schema_v1, None, None)
609 .unwrap();
610
611 let schema_v2 = json!({
613 "type": "object",
614 "required": ["user_id", "email"]
615 });
616
617 let result = registry.register_schema("user.created".to_string(), schema_v2, None, None);
618 assert!(result.is_ok());
619
620 let schema_v3 = json!({
622 "type": "object",
623 "required": ["user_id"]
624 });
625
626 let result = registry.register_schema("user.created".to_string(), schema_v3, None, None);
627 assert!(result.is_err());
628 }
629}