allsource_core/
schema.rs

1use crate::error::{AllSourceError, Result};
2use chrono::{DateTime, Utc};
3use parking_lot::RwLock;
4use serde::{Deserialize, Serialize};
5use serde_json::Value as JsonValue;
6use std::collections::HashMap;
7use std::sync::Arc;
8use uuid::Uuid;
9
10/// Compatibility mode for schema evolution
11#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
12#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
13pub enum CompatibilityMode {
14    /// No compatibility checking
15    None,
16    /// New schema must be backward compatible (new fields optional)
17    Backward,
18    /// New schema must be forward compatible (old fields preserved)
19    Forward,
20    /// New schema must be both backward and forward compatible
21    Full,
22}
23
24impl Default for CompatibilityMode {
25    fn default() -> Self {
26        Self::Backward
27    }
28}
29
30/// Schema definition with versioning
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct Schema {
33    /// Unique schema ID
34    pub id: Uuid,
35
36    /// Subject/topic name (e.g., "user.created", "order.placed")
37    pub subject: String,
38
39    /// Schema version number
40    pub version: u32,
41
42    /// JSON Schema definition
43    pub schema: JsonValue,
44
45    /// When this schema was registered
46    pub created_at: DateTime<Utc>,
47
48    /// Schema description/documentation
49    pub description: Option<String>,
50
51    /// Tags for organization
52    pub tags: Vec<String>,
53}
54
55impl Schema {
56    pub fn new(subject: String, version: u32, schema: JsonValue) -> Self {
57        Self {
58            id: Uuid::new_v4(),
59            subject,
60            version,
61            schema,
62            created_at: Utc::now(),
63            description: None,
64            tags: Vec::new(),
65        }
66    }
67}
68
69/// Request to register a new schema
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct RegisterSchemaRequest {
72    pub subject: String,
73    pub schema: JsonValue,
74    pub description: Option<String>,
75    pub tags: Option<Vec<String>>,
76}
77
78/// Response from schema registration
79#[derive(Debug, Clone, Serialize, Deserialize)]
80pub struct RegisterSchemaResponse {
81    pub schema_id: Uuid,
82    pub subject: String,
83    pub version: u32,
84    pub created_at: DateTime<Utc>,
85}
86
87/// Request to validate an event against a schema
88#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct ValidateEventRequest {
90    pub subject: String,
91    pub version: Option<u32>,
92    pub payload: JsonValue,
93}
94
95/// Response from event validation
96#[derive(Debug, Clone, Serialize, Deserialize)]
97pub struct ValidateEventResponse {
98    pub valid: bool,
99    pub errors: Vec<String>,
100    pub schema_version: u32,
101}
102
103/// Schema compatibility check result
104#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct CompatibilityCheckResult {
106    pub compatible: bool,
107    pub compatibility_mode: CompatibilityMode,
108    pub issues: Vec<String>,
109}
110
111/// Statistics about the schema registry
112#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct SchemaRegistryStats {
114    pub total_schemas: usize,
115    pub total_subjects: usize,
116    pub validations_performed: u64,
117    pub validation_failures: u64,
118}
119
120/// Configuration for the schema registry
121#[derive(Debug, Clone, Serialize, Deserialize)]
122pub struct SchemaRegistryConfig {
123    /// Default compatibility mode
124    pub default_compatibility: CompatibilityMode,
125
126    /// Whether to auto-register schemas on first use
127    pub auto_register: bool,
128
129    /// Whether to enforce schema validation on ingestion
130    pub enforce_validation: bool,
131}
132
133impl Default for SchemaRegistryConfig {
134    fn default() -> Self {
135        Self {
136            default_compatibility: CompatibilityMode::Backward,
137            auto_register: false,
138            enforce_validation: false,
139        }
140    }
141}
142
143/// Central registry for managing event schemas
144pub struct SchemaRegistry {
145    /// Schemas organized by subject and version
146    /// Key: subject -> version -> Schema
147    schemas: Arc<RwLock<HashMap<String, HashMap<u32, Schema>>>>,
148
149    /// Latest version for each subject
150    latest_versions: Arc<RwLock<HashMap<String, u32>>>,
151
152    /// Compatibility mode for each subject
153    compatibility_modes: Arc<RwLock<HashMap<String, CompatibilityMode>>>,
154
155    /// Configuration
156    config: SchemaRegistryConfig,
157
158    /// Statistics
159    stats: Arc<RwLock<SchemaRegistryStats>>,
160}
161
162impl SchemaRegistry {
163    pub fn new(config: SchemaRegistryConfig) -> Self {
164        Self {
165            schemas: Arc::new(RwLock::new(HashMap::new())),
166            latest_versions: Arc::new(RwLock::new(HashMap::new())),
167            compatibility_modes: Arc::new(RwLock::new(HashMap::new())),
168            config,
169            stats: Arc::new(RwLock::new(SchemaRegistryStats {
170                total_schemas: 0,
171                total_subjects: 0,
172                validations_performed: 0,
173                validation_failures: 0,
174            })),
175        }
176    }
177
178    /// Register a new schema or return existing if identical
179    pub fn register_schema(
180        &self,
181        subject: String,
182        schema: JsonValue,
183        description: Option<String>,
184        tags: Option<Vec<String>>,
185    ) -> Result<RegisterSchemaResponse> {
186        let mut schemas = self.schemas.write();
187        let mut latest_versions = self.latest_versions.write();
188
189        // Get or create subject entry
190        let subject_schemas = schemas.entry(subject.clone()).or_insert_with(HashMap::new);
191
192        // Determine next version
193        let next_version = latest_versions.get(&subject).map(|v| v + 1).unwrap_or(1);
194
195        // Check compatibility with previous version if it exists
196        if next_version > 1 {
197            let prev_version = next_version - 1;
198            if let Some(prev_schema) = subject_schemas.get(&prev_version) {
199                let compatibility = self.get_compatibility_mode(&subject);
200                let check_result = self.check_compatibility(
201                    &prev_schema.schema,
202                    &schema,
203                    compatibility,
204                )?;
205
206                if !check_result.compatible {
207                    return Err(AllSourceError::ValidationError(format!(
208                        "Schema compatibility check failed: {}",
209                        check_result.issues.join(", ")
210                    )));
211                }
212            }
213        }
214
215        // Create and store the schema
216        let mut new_schema = Schema::new(subject.clone(), next_version, schema);
217        new_schema.description = description;
218        new_schema.tags = tags.unwrap_or_default();
219
220        let schema_id = new_schema.id;
221        let created_at = new_schema.created_at;
222
223        subject_schemas.insert(next_version, new_schema);
224        latest_versions.insert(subject.clone(), next_version);
225
226        // Update stats
227        let mut stats = self.stats.write();
228        stats.total_schemas += 1;
229        if next_version == 1 {
230            stats.total_subjects += 1;
231        }
232
233        tracing::info!(
234            "📋 Registered schema v{} for subject '{}' (ID: {})",
235            next_version,
236            subject,
237            schema_id
238        );
239
240        Ok(RegisterSchemaResponse {
241            schema_id,
242            subject,
243            version: next_version,
244            created_at,
245        })
246    }
247
248    /// Get a schema by subject and version (or latest if no version specified)
249    pub fn get_schema(&self, subject: &str, version: Option<u32>) -> Result<Schema> {
250        let schemas = self.schemas.read();
251
252        let subject_schemas = schemas
253            .get(subject)
254            .ok_or_else(|| AllSourceError::ValidationError(format!("Subject not found: {}", subject)))?;
255
256        let version = match version {
257            Some(v) => v,
258            None => {
259                let latest_versions = self.latest_versions.read();
260                *latest_versions.get(subject).ok_or_else(|| {
261                    AllSourceError::ValidationError(format!("No versions for subject: {}", subject))
262                })?
263            }
264        };
265
266        subject_schemas
267            .get(&version)
268            .cloned()
269            .ok_or_else(|| {
270                AllSourceError::ValidationError(format!(
271                    "Schema version {} not found for subject: {}",
272                    version, subject
273                ))
274            })
275    }
276
277    /// List all versions of a schema subject
278    pub fn list_versions(&self, subject: &str) -> Result<Vec<u32>> {
279        let schemas = self.schemas.read();
280
281        let subject_schemas = schemas
282            .get(subject)
283            .ok_or_else(|| AllSourceError::ValidationError(format!("Subject not found: {}", subject)))?;
284
285        let mut versions: Vec<u32> = subject_schemas.keys().copied().collect();
286        versions.sort_unstable();
287
288        Ok(versions)
289    }
290
291    /// List all schema subjects
292    pub fn list_subjects(&self) -> Vec<String> {
293        let schemas = self.schemas.read();
294        schemas.keys().cloned().collect()
295    }
296
297    /// Validate a payload against a schema
298    pub fn validate(
299        &self,
300        subject: &str,
301        version: Option<u32>,
302        payload: &JsonValue,
303    ) -> Result<ValidateEventResponse> {
304        let schema = self.get_schema(subject, version)?;
305
306        let validation_result = self.validate_json(payload, &schema.schema);
307
308        // Update stats
309        let mut stats = self.stats.write();
310        stats.validations_performed += 1;
311        if !validation_result.is_empty() {
312            stats.validation_failures += 1;
313        }
314
315        Ok(ValidateEventResponse {
316            valid: validation_result.is_empty(),
317            errors: validation_result,
318            schema_version: schema.version,
319        })
320    }
321
322    /// Internal JSON Schema validation
323    fn validate_json(&self, data: &JsonValue, schema: &JsonValue) -> Vec<String> {
324        let mut errors = Vec::new();
325
326        // Basic JSON Schema validation
327        // In production, use jsonschema crate, but implementing basic checks here
328
329        // Check required fields
330        if let Some(required) = schema.get("required").and_then(|r| r.as_array()) {
331            if let Some(obj) = data.as_object() {
332                for req_field in required {
333                    if let Some(field_name) = req_field.as_str() {
334                        if !obj.contains_key(field_name) {
335                            errors.push(format!("Missing required field: {}", field_name));
336                        }
337                    }
338                }
339            }
340        }
341
342        // Check type
343        if let Some(expected_type) = schema.get("type").and_then(|t| t.as_str()) {
344            let actual_type = match data {
345                JsonValue::Null => "null",
346                JsonValue::Bool(_) => "boolean",
347                JsonValue::Number(_) => "number",
348                JsonValue::String(_) => "string",
349                JsonValue::Array(_) => "array",
350                JsonValue::Object(_) => "object",
351            };
352
353            if expected_type != actual_type {
354                errors.push(format!(
355                    "Type mismatch: expected {}, got {}",
356                    expected_type, actual_type
357                ));
358            }
359        }
360
361        // Check properties
362        if let (Some(properties), Some(data_obj)) = (
363            schema.get("properties").and_then(|p| p.as_object()),
364            data.as_object(),
365        ) {
366            for (key, value) in data_obj {
367                if let Some(prop_schema) = properties.get(key) {
368                    let nested_errors = self.validate_json(value, prop_schema);
369                    for err in nested_errors {
370                        errors.push(format!("{}.{}", key, err));
371                    }
372                }
373            }
374        }
375
376        errors
377    }
378
379    /// Check compatibility between two schemas
380    fn check_compatibility(
381        &self,
382        old_schema: &JsonValue,
383        new_schema: &JsonValue,
384        mode: CompatibilityMode,
385    ) -> Result<CompatibilityCheckResult> {
386        let mut issues = Vec::new();
387
388        match mode {
389            CompatibilityMode::None => {
390                return Ok(CompatibilityCheckResult {
391                    compatible: true,
392                    compatibility_mode: mode,
393                    issues: Vec::new(),
394                });
395            }
396            CompatibilityMode::Backward => {
397                // Check that all old required fields are still required
398                issues.extend(self.check_backward_compatibility(old_schema, new_schema));
399            }
400            CompatibilityMode::Forward => {
401                // Check that all new required fields were in old schema
402                issues.extend(self.check_forward_compatibility(old_schema, new_schema));
403            }
404            CompatibilityMode::Full => {
405                // Check both directions
406                issues.extend(self.check_backward_compatibility(old_schema, new_schema));
407                issues.extend(self.check_forward_compatibility(old_schema, new_schema));
408            }
409        }
410
411        Ok(CompatibilityCheckResult {
412            compatible: issues.is_empty(),
413            compatibility_mode: mode,
414            issues,
415        })
416    }
417
418    fn check_backward_compatibility(
419        &self,
420        old_schema: &JsonValue,
421        new_schema: &JsonValue,
422    ) -> Vec<String> {
423        let mut issues = Vec::new();
424
425        // Get required fields from old schema
426        if let Some(old_required) = old_schema.get("required").and_then(|r| r.as_array()) {
427            let new_required = new_schema
428                .get("required")
429                .and_then(|r| r.as_array())
430                .map(|arr| {
431                    arr.iter()
432                        .filter_map(|v| v.as_str())
433                        .collect::<Vec<_>>()
434                })
435                .unwrap_or_default();
436
437            for old_req in old_required {
438                if let Some(field_name) = old_req.as_str() {
439                    if !new_required.contains(&field_name) {
440                        issues.push(format!(
441                            "Backward compatibility: required field '{}' removed",
442                            field_name
443                        ));
444                    }
445                }
446            }
447        }
448
449        issues
450    }
451
452    fn check_forward_compatibility(
453        &self,
454        old_schema: &JsonValue,
455        new_schema: &JsonValue,
456    ) -> Vec<String> {
457        let mut issues = Vec::new();
458
459        // Get required fields from new schema
460        if let Some(new_required) = new_schema.get("required").and_then(|r| r.as_array()) {
461            let old_required = old_schema
462                .get("required")
463                .and_then(|r| r.as_array())
464                .map(|arr| {
465                    arr.iter()
466                        .filter_map(|v| v.as_str())
467                        .collect::<Vec<_>>()
468                })
469                .unwrap_or_default();
470
471            for new_req in new_required {
472                if let Some(field_name) = new_req.as_str() {
473                    if !old_required.contains(&field_name) {
474                        issues.push(format!(
475                            "Forward compatibility: new required field '{}' added",
476                            field_name
477                        ));
478                    }
479                }
480            }
481        }
482
483        issues
484    }
485
486    /// Set compatibility mode for a subject
487    pub fn set_compatibility_mode(&self, subject: String, mode: CompatibilityMode) {
488        let mut modes = self.compatibility_modes.write();
489        modes.insert(subject, mode);
490    }
491
492    /// Get compatibility mode for a subject (or default)
493    pub fn get_compatibility_mode(&self, subject: &str) -> CompatibilityMode {
494        let modes = self.compatibility_modes.read();
495        modes.get(subject).copied().unwrap_or(self.config.default_compatibility)
496    }
497
498    /// Delete a specific schema version
499    pub fn delete_schema(&self, subject: &str, version: u32) -> Result<bool> {
500        let mut schemas = self.schemas.write();
501
502        if let Some(subject_schemas) = schemas.get_mut(subject) {
503            if subject_schemas.remove(&version).is_some() {
504                tracing::info!("🗑️  Deleted schema v{} for subject '{}'", version, subject);
505
506                // Update stats
507                let mut stats = self.stats.write();
508                stats.total_schemas = stats.total_schemas.saturating_sub(1);
509
510                return Ok(true);
511            }
512        }
513
514        Ok(false)
515    }
516
517    /// Get registry statistics
518    pub fn stats(&self) -> SchemaRegistryStats {
519        self.stats.read().clone()
520    }
521
522    /// Get registry configuration
523    pub fn config(&self) -> &SchemaRegistryConfig {
524        &self.config
525    }
526}
527
528#[cfg(test)]
529mod tests {
530    use super::*;
531    use serde_json::json;
532
533    #[test]
534    fn test_schema_registration() {
535        let registry = SchemaRegistry::new(SchemaRegistryConfig::default());
536
537        let schema = json!({
538            "type": "object",
539            "properties": {
540                "user_id": {"type": "string"},
541                "email": {"type": "string"}
542            },
543            "required": ["user_id", "email"]
544        });
545
546        let response = registry
547            .register_schema(
548                "user.created".to_string(),
549                schema,
550                Some("User creation event".to_string()),
551                None,
552            )
553            .unwrap();
554
555        assert_eq!(response.version, 1);
556        assert_eq!(response.subject, "user.created");
557    }
558
559    #[test]
560    fn test_schema_validation() {
561        let registry = SchemaRegistry::new(SchemaRegistryConfig::default());
562
563        let schema = json!({
564            "type": "object",
565            "properties": {
566                "user_id": {"type": "string"},
567                "email": {"type": "string"}
568            },
569            "required": ["user_id", "email"]
570        });
571
572        registry
573            .register_schema("user.created".to_string(), schema, None, None)
574            .unwrap();
575
576        // Valid payload
577        let valid_payload = json!({
578            "user_id": "123",
579            "email": "test@example.com"
580        });
581
582        let result = registry.validate("user.created", None, &valid_payload).unwrap();
583        assert!(result.valid);
584
585        // Invalid payload (missing required field)
586        let invalid_payload = json!({
587            "user_id": "123"
588        });
589
590        let result = registry.validate("user.created", None, &invalid_payload).unwrap();
591        assert!(!result.valid);
592        assert!(!result.errors.is_empty());
593    }
594
595    #[test]
596    fn test_backward_compatibility() {
597        let registry = SchemaRegistry::new(SchemaRegistryConfig {
598            default_compatibility: CompatibilityMode::Backward,
599            ..Default::default()
600        });
601
602        let schema_v1 = json!({
603            "type": "object",
604            "required": ["user_id", "email"]
605        });
606
607        registry
608            .register_schema("user.created".to_string(), schema_v1, None, None)
609            .unwrap();
610
611        // Compatible: adding optional field
612        let schema_v2 = json!({
613            "type": "object",
614            "required": ["user_id", "email"]
615        });
616
617        let result = registry.register_schema("user.created".to_string(), schema_v2, None, None);
618        assert!(result.is_ok());
619
620        // Incompatible: removing required field
621        let schema_v3 = json!({
622            "type": "object",
623            "required": ["user_id"]
624        });
625
626        let result = registry.register_schema("user.created".to_string(), schema_v3, None, None);
627        assert!(result.is_err());
628    }
629}