Skip to main content

allsource_core/application/services/
schema.rs

1use crate::error::{AllSourceError, Result};
2use chrono::{DateTime, Utc};
3use dashmap::DashMap;
4use parking_lot::RwLock;
5use serde::{Deserialize, Serialize};
6use serde_json::Value as JsonValue;
7use std::collections::HashMap;
8use std::sync::Arc;
9use uuid::Uuid;
10
11/// Compatibility mode for schema evolution
12#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
13#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
14pub enum CompatibilityMode {
15    /// No compatibility checking
16    None,
17    /// New schema must be backward compatible (new fields optional)
18    Backward,
19    /// New schema must be forward compatible (old fields preserved)
20    Forward,
21    /// New schema must be both backward and forward compatible
22    Full,
23}
24
25impl Default for CompatibilityMode {
26    fn default() -> Self {
27        Self::Backward
28    }
29}
30
31/// Schema definition with versioning
32#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct Schema {
34    /// Unique schema ID
35    pub id: Uuid,
36
37    /// Subject/topic name (e.g., "user.created", "order.placed")
38    pub subject: String,
39
40    /// Schema version number
41    pub version: u32,
42
43    /// JSON Schema definition
44    pub schema: JsonValue,
45
46    /// When this schema was registered
47    pub created_at: DateTime<Utc>,
48
49    /// Schema description/documentation
50    pub description: Option<String>,
51
52    /// Tags for organization
53    pub tags: Vec<String>,
54}
55
56impl Schema {
57    pub fn new(subject: String, version: u32, schema: JsonValue) -> Self {
58        Self {
59            id: Uuid::new_v4(),
60            subject,
61            version,
62            schema,
63            created_at: Utc::now(),
64            description: None,
65            tags: Vec::new(),
66        }
67    }
68}
69
70/// Request to register a new schema
71#[derive(Debug, Clone, Serialize, Deserialize)]
72pub struct RegisterSchemaRequest {
73    pub subject: String,
74    pub schema: JsonValue,
75    pub description: Option<String>,
76    pub tags: Option<Vec<String>>,
77}
78
79/// Response from schema registration
80#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct RegisterSchemaResponse {
82    pub schema_id: Uuid,
83    pub subject: String,
84    pub version: u32,
85    pub created_at: DateTime<Utc>,
86}
87
88/// Request to validate an event against a schema
89#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct ValidateEventRequest {
91    pub subject: String,
92    pub version: Option<u32>,
93    pub payload: JsonValue,
94}
95
96/// Response from event validation
97#[derive(Debug, Clone, Serialize, Deserialize)]
98pub struct ValidateEventResponse {
99    pub valid: bool,
100    pub errors: Vec<String>,
101    pub schema_version: u32,
102}
103
104/// Schema compatibility check result
105#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct CompatibilityCheckResult {
107    pub compatible: bool,
108    pub compatibility_mode: CompatibilityMode,
109    pub issues: Vec<String>,
110}
111
112/// Statistics about the schema registry
113#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct SchemaRegistryStats {
115    pub total_schemas: usize,
116    pub total_subjects: usize,
117    pub validations_performed: u64,
118    pub validation_failures: u64,
119}
120
121/// Configuration for the schema registry
122#[derive(Debug, Clone, Serialize, Deserialize)]
123pub struct SchemaRegistryConfig {
124    /// Default compatibility mode
125    pub default_compatibility: CompatibilityMode,
126
127    /// Whether to auto-register schemas on first use
128    pub auto_register: bool,
129
130    /// Whether to enforce schema validation on ingestion
131    pub enforce_validation: bool,
132}
133
134impl Default for SchemaRegistryConfig {
135    fn default() -> Self {
136        Self {
137            default_compatibility: CompatibilityMode::Backward,
138            auto_register: false,
139            enforce_validation: false,
140        }
141    }
142}
143
144/// Central registry for managing event schemas
145pub struct SchemaRegistry {
146    /// Schemas organized by subject and version - using DashMap for lock-free concurrent access
147    /// Key: subject -> version -> Schema
148    schemas: Arc<DashMap<String, HashMap<u32, Schema>>>,
149
150    /// Latest version for each subject
151    latest_versions: Arc<DashMap<String, u32>>,
152
153    /// Compatibility mode for each subject
154    compatibility_modes: Arc<DashMap<String, CompatibilityMode>>,
155
156    /// Configuration
157    config: SchemaRegistryConfig,
158
159    /// Statistics
160    stats: Arc<RwLock<SchemaRegistryStats>>,
161}
162
163impl SchemaRegistry {
164    pub fn new(config: SchemaRegistryConfig) -> Self {
165        Self {
166            schemas: Arc::new(DashMap::new()),
167            latest_versions: Arc::new(DashMap::new()),
168            compatibility_modes: Arc::new(DashMap::new()),
169            config,
170            stats: Arc::new(RwLock::new(SchemaRegistryStats {
171                total_schemas: 0,
172                total_subjects: 0,
173                validations_performed: 0,
174                validation_failures: 0,
175            })),
176        }
177    }
178
179    /// Register a new schema or return existing if identical
180    pub fn register_schema(
181        &self,
182        subject: String,
183        schema: JsonValue,
184        description: Option<String>,
185        tags: Option<Vec<String>>,
186    ) -> Result<RegisterSchemaResponse> {
187        // Determine next version
188        let next_version = self.latest_versions.get(&subject).map(|v| *v + 1).unwrap_or(1);
189
190        // Check compatibility with previous version if it exists
191        if next_version > 1 {
192            let prev_version = next_version - 1;
193            if let Some(subject_schemas) = self.schemas.get(&subject) {
194                if let Some(prev_schema) = subject_schemas.get(&prev_version) {
195                    let compatibility = self.get_compatibility_mode(&subject);
196                    let check_result =
197                        self.check_compatibility(&prev_schema.schema, &schema, compatibility)?;
198
199                    if !check_result.compatible {
200                        return Err(AllSourceError::ValidationError(format!(
201                            "Schema compatibility check failed: {}",
202                            check_result.issues.join(", ")
203                        )));
204                    }
205                }
206            }
207        }
208
209        // Create and store the schema
210        let mut new_schema = Schema::new(subject.clone(), next_version, schema);
211        new_schema.description = description;
212        new_schema.tags = tags.unwrap_or_default();
213
214        let schema_id = new_schema.id;
215        let created_at = new_schema.created_at;
216
217        // Get or create subject entry and insert the schema
218        self.schemas.entry(subject.clone()).or_default().insert(next_version, new_schema);
219        self.latest_versions.insert(subject.clone(), next_version);
220
221        // Update stats
222        let mut stats = self.stats.write();
223        stats.total_schemas += 1;
224        if next_version == 1 {
225            stats.total_subjects += 1;
226        }
227
228        tracing::info!(
229            "📋 Registered schema v{} for subject '{}' (ID: {})",
230            next_version,
231            subject,
232            schema_id
233        );
234
235        Ok(RegisterSchemaResponse {
236            schema_id,
237            subject,
238            version: next_version,
239            created_at,
240        })
241    }
242
243    /// Get a schema by subject and version (or latest if no version specified)
244    pub fn get_schema(&self, subject: &str, version: Option<u32>) -> Result<Schema> {
245        let subject_schemas = self.schemas.get(subject).ok_or_else(|| {
246            AllSourceError::ValidationError(format!("Subject not found: {subject}"))
247        })?;
248
249        let version = match version {
250            Some(v) => v,
251            None => {
252                *self.latest_versions.get(subject).ok_or_else(|| {
253                    AllSourceError::ValidationError(format!("No versions for subject: {subject}"))
254                })?
255            }
256        };
257
258        subject_schemas.get(&version).cloned().ok_or_else(|| {
259            AllSourceError::ValidationError(format!(
260                "Schema version {} not found for subject: {}",
261                version, subject
262            ))
263        })
264    }
265
266    /// List all versions of a schema subject
267    pub fn list_versions(&self, subject: &str) -> Result<Vec<u32>> {
268        let subject_schemas = self.schemas.get(subject).ok_or_else(|| {
269            AllSourceError::ValidationError(format!("Subject not found: {subject}"))
270        })?;
271
272        let mut versions: Vec<u32> = subject_schemas.keys().copied().collect();
273        versions.sort_unstable();
274
275        Ok(versions)
276    }
277
278    /// List all schema subjects
279    pub fn list_subjects(&self) -> Vec<String> {
280        self.schemas.iter().map(|entry| entry.key().clone()).collect()
281    }
282
283    /// Validate a payload against a schema
284    pub fn validate(
285        &self,
286        subject: &str,
287        version: Option<u32>,
288        payload: &JsonValue,
289    ) -> Result<ValidateEventResponse> {
290        let schema = self.get_schema(subject, version)?;
291
292        let validation_result = self.validate_json(payload, &schema.schema);
293
294        // Update stats
295        let mut stats = self.stats.write();
296        stats.validations_performed += 1;
297        if !validation_result.is_empty() {
298            stats.validation_failures += 1;
299        }
300
301        Ok(ValidateEventResponse {
302            valid: validation_result.is_empty(),
303            errors: validation_result,
304            schema_version: schema.version,
305        })
306    }
307
308    /// Internal JSON Schema validation
309    fn validate_json(&self, data: &JsonValue, schema: &JsonValue) -> Vec<String> {
310        let mut errors = Vec::new();
311
312        // Basic JSON Schema validation
313        // In production, use jsonschema crate, but implementing basic checks here
314
315        // Check required fields
316        if let Some(required) = schema.get("required").and_then(|r| r.as_array()) {
317            if let Some(obj) = data.as_object() {
318                for req_field in required {
319                    if let Some(field_name) = req_field.as_str() {
320                        if !obj.contains_key(field_name) {
321                            errors.push(format!("Missing required field: {field_name}"));
322                        }
323                    }
324                }
325            }
326        }
327
328        // Check type
329        if let Some(expected_type) = schema.get("type").and_then(|t| t.as_str()) {
330            let actual_type = match data {
331                JsonValue::Null => "null",
332                JsonValue::Bool(_) => "boolean",
333                JsonValue::Number(_) => "number",
334                JsonValue::String(_) => "string",
335                JsonValue::Array(_) => "array",
336                JsonValue::Object(_) => "object",
337            };
338
339            if expected_type != actual_type {
340                errors.push(format!(
341                    "Type mismatch: expected {}, got {}",
342                    expected_type, actual_type
343                ));
344            }
345        }
346
347        // Check properties
348        if let (Some(properties), Some(data_obj)) = (
349            schema.get("properties").and_then(|p| p.as_object()),
350            data.as_object(),
351        ) {
352            for (key, value) in data_obj {
353                if let Some(prop_schema) = properties.get(key) {
354                    let nested_errors = self.validate_json(value, prop_schema);
355                    for err in nested_errors {
356                        errors.push(format!("{key}.{err}"));
357                    }
358                }
359            }
360        }
361
362        errors
363    }
364
365    /// Check compatibility between two schemas
366    fn check_compatibility(
367        &self,
368        old_schema: &JsonValue,
369        new_schema: &JsonValue,
370        mode: CompatibilityMode,
371    ) -> Result<CompatibilityCheckResult> {
372        let mut issues = Vec::new();
373
374        match mode {
375            CompatibilityMode::None => {
376                return Ok(CompatibilityCheckResult {
377                    compatible: true,
378                    compatibility_mode: mode,
379                    issues: Vec::new(),
380                });
381            }
382            CompatibilityMode::Backward => {
383                // Check that all old required fields are still required
384                issues.extend(self.check_backward_compatibility(old_schema, new_schema));
385            }
386            CompatibilityMode::Forward => {
387                // Check that all new required fields were in old schema
388                issues.extend(self.check_forward_compatibility(old_schema, new_schema));
389            }
390            CompatibilityMode::Full => {
391                // Check both directions
392                issues.extend(self.check_backward_compatibility(old_schema, new_schema));
393                issues.extend(self.check_forward_compatibility(old_schema, new_schema));
394            }
395        }
396
397        Ok(CompatibilityCheckResult {
398            compatible: issues.is_empty(),
399            compatibility_mode: mode,
400            issues,
401        })
402    }
403
404    fn check_backward_compatibility(
405        &self,
406        old_schema: &JsonValue,
407        new_schema: &JsonValue,
408    ) -> Vec<String> {
409        let mut issues = Vec::new();
410
411        // Get required fields from old schema
412        if let Some(old_required) = old_schema.get("required").and_then(|r| r.as_array()) {
413            let new_required = new_schema
414                .get("required")
415                .and_then(|r| r.as_array())
416                .map(|arr| arr.iter().filter_map(|v| v.as_str()).collect::<Vec<_>>())
417                .unwrap_or_default();
418
419            for old_req in old_required {
420                if let Some(field_name) = old_req.as_str() {
421                    if !new_required.contains(&field_name) {
422                        issues.push(format!(
423                            "Backward compatibility: required field '{}' removed",
424                            field_name
425                        ));
426                    }
427                }
428            }
429        }
430
431        issues
432    }
433
434    fn check_forward_compatibility(
435        &self,
436        old_schema: &JsonValue,
437        new_schema: &JsonValue,
438    ) -> Vec<String> {
439        let mut issues = Vec::new();
440
441        // Get required fields from new schema
442        if let Some(new_required) = new_schema.get("required").and_then(|r| r.as_array()) {
443            let old_required = old_schema
444                .get("required")
445                .and_then(|r| r.as_array())
446                .map(|arr| arr.iter().filter_map(|v| v.as_str()).collect::<Vec<_>>())
447                .unwrap_or_default();
448
449            for new_req in new_required {
450                if let Some(field_name) = new_req.as_str() {
451                    if !old_required.contains(&field_name) {
452                        issues.push(format!(
453                            "Forward compatibility: new required field '{}' added",
454                            field_name
455                        ));
456                    }
457                }
458            }
459        }
460
461        issues
462    }
463
464    /// Set compatibility mode for a subject
465    pub fn set_compatibility_mode(&self, subject: String, mode: CompatibilityMode) {
466        self.compatibility_modes.insert(subject, mode);
467    }
468
469    /// Get compatibility mode for a subject (or default)
470    pub fn get_compatibility_mode(&self, subject: &str) -> CompatibilityMode {
471        self.compatibility_modes
472            .get(subject)
473            .map(|entry| *entry.value())
474            .unwrap_or(self.config.default_compatibility)
475    }
476
477    /// Delete a specific schema version
478    pub fn delete_schema(&self, subject: &str, version: u32) -> Result<bool> {
479        if let Some(mut subject_schemas) = self.schemas.get_mut(subject) {
480            if subject_schemas.remove(&version).is_some() {
481                tracing::info!("🗑️  Deleted schema v{} for subject '{}'", version, subject);
482
483                // Update stats
484                let mut stats = self.stats.write();
485                stats.total_schemas = stats.total_schemas.saturating_sub(1);
486
487                return Ok(true);
488            }
489        }
490
491        Ok(false)
492    }
493
494    /// Get registry statistics
495    pub fn stats(&self) -> SchemaRegistryStats {
496        self.stats.read().clone()
497    }
498
499    /// Get registry configuration
500    pub fn config(&self) -> &SchemaRegistryConfig {
501        &self.config
502    }
503}
504
505#[cfg(test)]
506mod tests {
507    use super::*;
508    use serde_json::json;
509
510    #[test]
511    fn test_schema_registration() {
512        let registry = SchemaRegistry::new(SchemaRegistryConfig::default());
513
514        let schema = json!({
515            "type": "object",
516            "properties": {
517                "user_id": {"type": "string"},
518                "email": {"type": "string"}
519            },
520            "required": ["user_id", "email"]
521        });
522
523        let response = registry
524            .register_schema(
525                "user.created".to_string(),
526                schema,
527                Some("User creation event".to_string()),
528                None,
529            )
530            .unwrap();
531
532        assert_eq!(response.version, 1);
533        assert_eq!(response.subject, "user.created");
534    }
535
536    #[test]
537    fn test_schema_validation() {
538        let registry = SchemaRegistry::new(SchemaRegistryConfig::default());
539
540        let schema = json!({
541            "type": "object",
542            "properties": {
543                "user_id": {"type": "string"},
544                "email": {"type": "string"}
545            },
546            "required": ["user_id", "email"]
547        });
548
549        registry
550            .register_schema("user.created".to_string(), schema, None, None)
551            .unwrap();
552
553        // Valid payload
554        let valid_payload = json!({
555            "user_id": "123",
556            "email": "test@example.com"
557        });
558
559        let result = registry
560            .validate("user.created", None, &valid_payload)
561            .unwrap();
562        assert!(result.valid);
563
564        // Invalid payload (missing required field)
565        let invalid_payload = json!({
566            "user_id": "123"
567        });
568
569        let result = registry
570            .validate("user.created", None, &invalid_payload)
571            .unwrap();
572        assert!(!result.valid);
573        assert!(!result.errors.is_empty());
574    }
575
576    #[test]
577    fn test_backward_compatibility() {
578        let registry = SchemaRegistry::new(SchemaRegistryConfig {
579            default_compatibility: CompatibilityMode::Backward,
580            ..Default::default()
581        });
582
583        let schema_v1 = json!({
584            "type": "object",
585            "required": ["user_id", "email"]
586        });
587
588        registry
589            .register_schema("user.created".to_string(), schema_v1, None, None)
590            .unwrap();
591
592        // Compatible: adding optional field
593        let schema_v2 = json!({
594            "type": "object",
595            "required": ["user_id", "email"]
596        });
597
598        let result = registry.register_schema("user.created".to_string(), schema_v2, None, None);
599        assert!(result.is_ok());
600
601        // Incompatible: removing required field
602        let schema_v3 = json!({
603            "type": "object",
604            "required": ["user_id"]
605        });
606
607        let result = registry.register_schema("user.created".to_string(), schema_v3, None, None);
608        assert!(result.is_err());
609    }
610}