allsource_core/application/services/
schema.rs

1use crate::error::{AllSourceError, Result};
2use chrono::{DateTime, Utc};
3use parking_lot::RwLock;
4use serde::{Deserialize, Serialize};
5use serde_json::Value as JsonValue;
6use std::collections::HashMap;
7use std::sync::Arc;
8use uuid::Uuid;
9
10/// Compatibility mode for schema evolution
11#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
12#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
13pub enum CompatibilityMode {
14    /// No compatibility checking
15    None,
16    /// New schema must be backward compatible (new fields optional)
17    Backward,
18    /// New schema must be forward compatible (old fields preserved)
19    Forward,
20    /// New schema must be both backward and forward compatible
21    Full,
22}
23
24impl Default for CompatibilityMode {
25    fn default() -> Self {
26        Self::Backward
27    }
28}
29
30/// Schema definition with versioning
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct Schema {
33    /// Unique schema ID
34    pub id: Uuid,
35
36    /// Subject/topic name (e.g., "user.created", "order.placed")
37    pub subject: String,
38
39    /// Schema version number
40    pub version: u32,
41
42    /// JSON Schema definition
43    pub schema: JsonValue,
44
45    /// When this schema was registered
46    pub created_at: DateTime<Utc>,
47
48    /// Schema description/documentation
49    pub description: Option<String>,
50
51    /// Tags for organization
52    pub tags: Vec<String>,
53}
54
55impl Schema {
56    pub fn new(subject: String, version: u32, schema: JsonValue) -> Self {
57        Self {
58            id: Uuid::new_v4(),
59            subject,
60            version,
61            schema,
62            created_at: Utc::now(),
63            description: None,
64            tags: Vec::new(),
65        }
66    }
67}
68
69/// Request to register a new schema
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct RegisterSchemaRequest {
72    pub subject: String,
73    pub schema: JsonValue,
74    pub description: Option<String>,
75    pub tags: Option<Vec<String>>,
76}
77
78/// Response from schema registration
79#[derive(Debug, Clone, Serialize, Deserialize)]
80pub struct RegisterSchemaResponse {
81    pub schema_id: Uuid,
82    pub subject: String,
83    pub version: u32,
84    pub created_at: DateTime<Utc>,
85}
86
87/// Request to validate an event against a schema
88#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct ValidateEventRequest {
90    pub subject: String,
91    pub version: Option<u32>,
92    pub payload: JsonValue,
93}
94
95/// Response from event validation
96#[derive(Debug, Clone, Serialize, Deserialize)]
97pub struct ValidateEventResponse {
98    pub valid: bool,
99    pub errors: Vec<String>,
100    pub schema_version: u32,
101}
102
103/// Schema compatibility check result
104#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct CompatibilityCheckResult {
106    pub compatible: bool,
107    pub compatibility_mode: CompatibilityMode,
108    pub issues: Vec<String>,
109}
110
111/// Statistics about the schema registry
112#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct SchemaRegistryStats {
114    pub total_schemas: usize,
115    pub total_subjects: usize,
116    pub validations_performed: u64,
117    pub validation_failures: u64,
118}
119
120/// Configuration for the schema registry
121#[derive(Debug, Clone, Serialize, Deserialize)]
122pub struct SchemaRegistryConfig {
123    /// Default compatibility mode
124    pub default_compatibility: CompatibilityMode,
125
126    /// Whether to auto-register schemas on first use
127    pub auto_register: bool,
128
129    /// Whether to enforce schema validation on ingestion
130    pub enforce_validation: bool,
131}
132
133impl Default for SchemaRegistryConfig {
134    fn default() -> Self {
135        Self {
136            default_compatibility: CompatibilityMode::Backward,
137            auto_register: false,
138            enforce_validation: false,
139        }
140    }
141}
142
143/// Central registry for managing event schemas
144pub struct SchemaRegistry {
145    /// Schemas organized by subject and version
146    /// Key: subject -> version -> Schema
147    schemas: Arc<RwLock<HashMap<String, HashMap<u32, Schema>>>>,
148
149    /// Latest version for each subject
150    latest_versions: Arc<RwLock<HashMap<String, u32>>>,
151
152    /// Compatibility mode for each subject
153    compatibility_modes: Arc<RwLock<HashMap<String, CompatibilityMode>>>,
154
155    /// Configuration
156    config: SchemaRegistryConfig,
157
158    /// Statistics
159    stats: Arc<RwLock<SchemaRegistryStats>>,
160}
161
162impl SchemaRegistry {
163    pub fn new(config: SchemaRegistryConfig) -> Self {
164        Self {
165            schemas: Arc::new(RwLock::new(HashMap::new())),
166            latest_versions: Arc::new(RwLock::new(HashMap::new())),
167            compatibility_modes: Arc::new(RwLock::new(HashMap::new())),
168            config,
169            stats: Arc::new(RwLock::new(SchemaRegistryStats {
170                total_schemas: 0,
171                total_subjects: 0,
172                validations_performed: 0,
173                validation_failures: 0,
174            })),
175        }
176    }
177
178    /// Register a new schema or return existing if identical
179    pub fn register_schema(
180        &self,
181        subject: String,
182        schema: JsonValue,
183        description: Option<String>,
184        tags: Option<Vec<String>>,
185    ) -> Result<RegisterSchemaResponse> {
186        let mut schemas = self.schemas.write();
187        let mut latest_versions = self.latest_versions.write();
188
189        // Get or create subject entry
190        let subject_schemas = schemas.entry(subject.clone()).or_insert_with(HashMap::new);
191
192        // Determine next version
193        let next_version = latest_versions.get(&subject).map(|v| v + 1).unwrap_or(1);
194
195        // Check compatibility with previous version if it exists
196        if next_version > 1 {
197            let prev_version = next_version - 1;
198            if let Some(prev_schema) = subject_schemas.get(&prev_version) {
199                let compatibility = self.get_compatibility_mode(&subject);
200                let check_result =
201                    self.check_compatibility(&prev_schema.schema, &schema, compatibility)?;
202
203                if !check_result.compatible {
204                    return Err(AllSourceError::ValidationError(format!(
205                        "Schema compatibility check failed: {}",
206                        check_result.issues.join(", ")
207                    )));
208                }
209            }
210        }
211
212        // Create and store the schema
213        let mut new_schema = Schema::new(subject.clone(), next_version, schema);
214        new_schema.description = description;
215        new_schema.tags = tags.unwrap_or_default();
216
217        let schema_id = new_schema.id;
218        let created_at = new_schema.created_at;
219
220        subject_schemas.insert(next_version, new_schema);
221        latest_versions.insert(subject.clone(), next_version);
222
223        // Update stats
224        let mut stats = self.stats.write();
225        stats.total_schemas += 1;
226        if next_version == 1 {
227            stats.total_subjects += 1;
228        }
229
230        tracing::info!(
231            "📋 Registered schema v{} for subject '{}' (ID: {})",
232            next_version,
233            subject,
234            schema_id
235        );
236
237        Ok(RegisterSchemaResponse {
238            schema_id,
239            subject,
240            version: next_version,
241            created_at,
242        })
243    }
244
245    /// Get a schema by subject and version (or latest if no version specified)
246    pub fn get_schema(&self, subject: &str, version: Option<u32>) -> Result<Schema> {
247        let schemas = self.schemas.read();
248
249        let subject_schemas = schemas.get(subject).ok_or_else(|| {
250            AllSourceError::ValidationError(format!("Subject not found: {}", subject))
251        })?;
252
253        let version = match version {
254            Some(v) => v,
255            None => {
256                let latest_versions = self.latest_versions.read();
257                *latest_versions.get(subject).ok_or_else(|| {
258                    AllSourceError::ValidationError(format!("No versions for subject: {}", subject))
259                })?
260            }
261        };
262
263        subject_schemas.get(&version).cloned().ok_or_else(|| {
264            AllSourceError::ValidationError(format!(
265                "Schema version {} not found for subject: {}",
266                version, subject
267            ))
268        })
269    }
270
271    /// List all versions of a schema subject
272    pub fn list_versions(&self, subject: &str) -> Result<Vec<u32>> {
273        let schemas = self.schemas.read();
274
275        let subject_schemas = schemas.get(subject).ok_or_else(|| {
276            AllSourceError::ValidationError(format!("Subject not found: {}", subject))
277        })?;
278
279        let mut versions: Vec<u32> = subject_schemas.keys().copied().collect();
280        versions.sort_unstable();
281
282        Ok(versions)
283    }
284
285    /// List all schema subjects
286    pub fn list_subjects(&self) -> Vec<String> {
287        let schemas = self.schemas.read();
288        schemas.keys().cloned().collect()
289    }
290
291    /// Validate a payload against a schema
292    pub fn validate(
293        &self,
294        subject: &str,
295        version: Option<u32>,
296        payload: &JsonValue,
297    ) -> Result<ValidateEventResponse> {
298        let schema = self.get_schema(subject, version)?;
299
300        let validation_result = self.validate_json(payload, &schema.schema);
301
302        // Update stats
303        let mut stats = self.stats.write();
304        stats.validations_performed += 1;
305        if !validation_result.is_empty() {
306            stats.validation_failures += 1;
307        }
308
309        Ok(ValidateEventResponse {
310            valid: validation_result.is_empty(),
311            errors: validation_result,
312            schema_version: schema.version,
313        })
314    }
315
316    /// Internal JSON Schema validation
317    fn validate_json(&self, data: &JsonValue, schema: &JsonValue) -> Vec<String> {
318        let mut errors = Vec::new();
319
320        // Basic JSON Schema validation
321        // In production, use jsonschema crate, but implementing basic checks here
322
323        // Check required fields
324        if let Some(required) = schema.get("required").and_then(|r| r.as_array()) {
325            if let Some(obj) = data.as_object() {
326                for req_field in required {
327                    if let Some(field_name) = req_field.as_str() {
328                        if !obj.contains_key(field_name) {
329                            errors.push(format!("Missing required field: {}", field_name));
330                        }
331                    }
332                }
333            }
334        }
335
336        // Check type
337        if let Some(expected_type) = schema.get("type").and_then(|t| t.as_str()) {
338            let actual_type = match data {
339                JsonValue::Null => "null",
340                JsonValue::Bool(_) => "boolean",
341                JsonValue::Number(_) => "number",
342                JsonValue::String(_) => "string",
343                JsonValue::Array(_) => "array",
344                JsonValue::Object(_) => "object",
345            };
346
347            if expected_type != actual_type {
348                errors.push(format!(
349                    "Type mismatch: expected {}, got {}",
350                    expected_type, actual_type
351                ));
352            }
353        }
354
355        // Check properties
356        if let (Some(properties), Some(data_obj)) = (
357            schema.get("properties").and_then(|p| p.as_object()),
358            data.as_object(),
359        ) {
360            for (key, value) in data_obj {
361                if let Some(prop_schema) = properties.get(key) {
362                    let nested_errors = self.validate_json(value, prop_schema);
363                    for err in nested_errors {
364                        errors.push(format!("{}.{}", key, err));
365                    }
366                }
367            }
368        }
369
370        errors
371    }
372
373    /// Check compatibility between two schemas
374    fn check_compatibility(
375        &self,
376        old_schema: &JsonValue,
377        new_schema: &JsonValue,
378        mode: CompatibilityMode,
379    ) -> Result<CompatibilityCheckResult> {
380        let mut issues = Vec::new();
381
382        match mode {
383            CompatibilityMode::None => {
384                return Ok(CompatibilityCheckResult {
385                    compatible: true,
386                    compatibility_mode: mode,
387                    issues: Vec::new(),
388                });
389            }
390            CompatibilityMode::Backward => {
391                // Check that all old required fields are still required
392                issues.extend(self.check_backward_compatibility(old_schema, new_schema));
393            }
394            CompatibilityMode::Forward => {
395                // Check that all new required fields were in old schema
396                issues.extend(self.check_forward_compatibility(old_schema, new_schema));
397            }
398            CompatibilityMode::Full => {
399                // Check both directions
400                issues.extend(self.check_backward_compatibility(old_schema, new_schema));
401                issues.extend(self.check_forward_compatibility(old_schema, new_schema));
402            }
403        }
404
405        Ok(CompatibilityCheckResult {
406            compatible: issues.is_empty(),
407            compatibility_mode: mode,
408            issues,
409        })
410    }
411
412    fn check_backward_compatibility(
413        &self,
414        old_schema: &JsonValue,
415        new_schema: &JsonValue,
416    ) -> Vec<String> {
417        let mut issues = Vec::new();
418
419        // Get required fields from old schema
420        if let Some(old_required) = old_schema.get("required").and_then(|r| r.as_array()) {
421            let new_required = new_schema
422                .get("required")
423                .and_then(|r| r.as_array())
424                .map(|arr| arr.iter().filter_map(|v| v.as_str()).collect::<Vec<_>>())
425                .unwrap_or_default();
426
427            for old_req in old_required {
428                if let Some(field_name) = old_req.as_str() {
429                    if !new_required.contains(&field_name) {
430                        issues.push(format!(
431                            "Backward compatibility: required field '{}' removed",
432                            field_name
433                        ));
434                    }
435                }
436            }
437        }
438
439        issues
440    }
441
442    fn check_forward_compatibility(
443        &self,
444        old_schema: &JsonValue,
445        new_schema: &JsonValue,
446    ) -> Vec<String> {
447        let mut issues = Vec::new();
448
449        // Get required fields from new schema
450        if let Some(new_required) = new_schema.get("required").and_then(|r| r.as_array()) {
451            let old_required = old_schema
452                .get("required")
453                .and_then(|r| r.as_array())
454                .map(|arr| arr.iter().filter_map(|v| v.as_str()).collect::<Vec<_>>())
455                .unwrap_or_default();
456
457            for new_req in new_required {
458                if let Some(field_name) = new_req.as_str() {
459                    if !old_required.contains(&field_name) {
460                        issues.push(format!(
461                            "Forward compatibility: new required field '{}' added",
462                            field_name
463                        ));
464                    }
465                }
466            }
467        }
468
469        issues
470    }
471
472    /// Set compatibility mode for a subject
473    pub fn set_compatibility_mode(&self, subject: String, mode: CompatibilityMode) {
474        let mut modes = self.compatibility_modes.write();
475        modes.insert(subject, mode);
476    }
477
478    /// Get compatibility mode for a subject (or default)
479    pub fn get_compatibility_mode(&self, subject: &str) -> CompatibilityMode {
480        let modes = self.compatibility_modes.read();
481        modes
482            .get(subject)
483            .copied()
484            .unwrap_or(self.config.default_compatibility)
485    }
486
487    /// Delete a specific schema version
488    pub fn delete_schema(&self, subject: &str, version: u32) -> Result<bool> {
489        let mut schemas = self.schemas.write();
490
491        if let Some(subject_schemas) = schemas.get_mut(subject) {
492            if subject_schemas.remove(&version).is_some() {
493                tracing::info!("🗑️  Deleted schema v{} for subject '{}'", version, subject);
494
495                // Update stats
496                let mut stats = self.stats.write();
497                stats.total_schemas = stats.total_schemas.saturating_sub(1);
498
499                return Ok(true);
500            }
501        }
502
503        Ok(false)
504    }
505
506    /// Get registry statistics
507    pub fn stats(&self) -> SchemaRegistryStats {
508        self.stats.read().clone()
509    }
510
511    /// Get registry configuration
512    pub fn config(&self) -> &SchemaRegistryConfig {
513        &self.config
514    }
515}
516
517#[cfg(test)]
518mod tests {
519    use super::*;
520    use serde_json::json;
521
522    #[test]
523    fn test_schema_registration() {
524        let registry = SchemaRegistry::new(SchemaRegistryConfig::default());
525
526        let schema = json!({
527            "type": "object",
528            "properties": {
529                "user_id": {"type": "string"},
530                "email": {"type": "string"}
531            },
532            "required": ["user_id", "email"]
533        });
534
535        let response = registry
536            .register_schema(
537                "user.created".to_string(),
538                schema,
539                Some("User creation event".to_string()),
540                None,
541            )
542            .unwrap();
543
544        assert_eq!(response.version, 1);
545        assert_eq!(response.subject, "user.created");
546    }
547
548    #[test]
549    fn test_schema_validation() {
550        let registry = SchemaRegistry::new(SchemaRegistryConfig::default());
551
552        let schema = json!({
553            "type": "object",
554            "properties": {
555                "user_id": {"type": "string"},
556                "email": {"type": "string"}
557            },
558            "required": ["user_id", "email"]
559        });
560
561        registry
562            .register_schema("user.created".to_string(), schema, None, None)
563            .unwrap();
564
565        // Valid payload
566        let valid_payload = json!({
567            "user_id": "123",
568            "email": "test@example.com"
569        });
570
571        let result = registry
572            .validate("user.created", None, &valid_payload)
573            .unwrap();
574        assert!(result.valid);
575
576        // Invalid payload (missing required field)
577        let invalid_payload = json!({
578            "user_id": "123"
579        });
580
581        let result = registry
582            .validate("user.created", None, &invalid_payload)
583            .unwrap();
584        assert!(!result.valid);
585        assert!(!result.errors.is_empty());
586    }
587
588    #[test]
589    fn test_backward_compatibility() {
590        let registry = SchemaRegistry::new(SchemaRegistryConfig {
591            default_compatibility: CompatibilityMode::Backward,
592            ..Default::default()
593        });
594
595        let schema_v1 = json!({
596            "type": "object",
597            "required": ["user_id", "email"]
598        });
599
600        registry
601            .register_schema("user.created".to_string(), schema_v1, None, None)
602            .unwrap();
603
604        // Compatible: adding optional field
605        let schema_v2 = json!({
606            "type": "object",
607            "required": ["user_id", "email"]
608        });
609
610        let result = registry.register_schema("user.created".to_string(), schema_v2, None, None);
611        assert!(result.is_ok());
612
613        // Incompatible: removing required field
614        let schema_v3 = json!({
615            "type": "object",
616            "required": ["user_id"]
617        });
618
619        let result = registry.register_schema("user.created".to_string(), schema_v3, None, None);
620        assert!(result.is_err());
621    }
622}