Skip to main content

allsource_core/application/services/
schema.rs

1use crate::error::{AllSourceError, Result};
2use chrono::{DateTime, Utc};
3use dashmap::DashMap;
4use parking_lot::RwLock;
5use serde::{Deserialize, Serialize};
6use serde_json::Value as JsonValue;
7use std::{collections::HashMap, sync::Arc};
8use uuid::Uuid;
9
10/// Compatibility mode for schema evolution
11#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq)]
12#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
13pub enum CompatibilityMode {
14    /// No compatibility checking
15    None,
16    /// New schema must be backward compatible (new fields optional)
17    #[default]
18    Backward,
19    /// New schema must be forward compatible (old fields preserved)
20    Forward,
21    /// New schema must be both backward and forward compatible
22    Full,
23}
24
25/// Schema definition with versioning
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct Schema {
28    /// Unique schema ID
29    pub id: Uuid,
30
31    /// Subject/topic name (e.g., "user.created", "order.placed")
32    pub subject: String,
33
34    /// Schema version number
35    pub version: u32,
36
37    /// JSON Schema definition
38    pub schema: JsonValue,
39
40    /// When this schema was registered
41    pub created_at: DateTime<Utc>,
42
43    /// Schema description/documentation
44    pub description: Option<String>,
45
46    /// Tags for organization
47    pub tags: Vec<String>,
48}
49
50impl Schema {
51    pub fn new(subject: String, version: u32, schema: JsonValue) -> Self {
52        Self {
53            id: Uuid::new_v4(),
54            subject,
55            version,
56            schema,
57            created_at: Utc::now(),
58            description: None,
59            tags: Vec::new(),
60        }
61    }
62}
63
64/// Request to register a new schema
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct RegisterSchemaRequest {
67    pub subject: String,
68    pub schema: JsonValue,
69    pub description: Option<String>,
70    pub tags: Option<Vec<String>>,
71}
72
73/// Response from schema registration
74#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct RegisterSchemaResponse {
76    pub schema_id: Uuid,
77    pub subject: String,
78    pub version: u32,
79    pub created_at: DateTime<Utc>,
80}
81
82/// Request to validate an event against a schema
83#[derive(Debug, Clone, Serialize, Deserialize)]
84pub struct ValidateEventRequest {
85    pub subject: String,
86    pub version: Option<u32>,
87    pub payload: JsonValue,
88}
89
90/// Response from event validation
91#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct ValidateEventResponse {
93    pub valid: bool,
94    pub errors: Vec<String>,
95    pub schema_version: u32,
96}
97
98/// Schema compatibility check result
99#[derive(Debug, Clone, Serialize, Deserialize)]
100pub struct CompatibilityCheckResult {
101    pub compatible: bool,
102    pub compatibility_mode: CompatibilityMode,
103    pub issues: Vec<String>,
104}
105
106/// Statistics about the schema registry
107#[derive(Debug, Clone, Serialize, Deserialize)]
108pub struct SchemaRegistryStats {
109    pub total_schemas: usize,
110    pub total_subjects: usize,
111    pub validations_performed: u64,
112    pub validation_failures: u64,
113}
114
115/// Configuration for the schema registry
116#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct SchemaRegistryConfig {
118    /// Default compatibility mode
119    pub default_compatibility: CompatibilityMode,
120
121    /// Whether to auto-register schemas on first use
122    pub auto_register: bool,
123
124    /// Whether to enforce schema validation on ingestion
125    pub enforce_validation: bool,
126}
127
128impl Default for SchemaRegistryConfig {
129    fn default() -> Self {
130        Self {
131            default_compatibility: CompatibilityMode::Backward,
132            auto_register: false,
133            enforce_validation: false,
134        }
135    }
136}
137
138/// Central registry for managing event schemas
139pub struct SchemaRegistry {
140    /// Schemas organized by subject and version - using DashMap for lock-free concurrent access
141    /// Key: subject -> version -> Schema
142    schemas: Arc<DashMap<String, HashMap<u32, Schema>>>,
143
144    /// Latest version for each subject
145    latest_versions: Arc<DashMap<String, u32>>,
146
147    /// Compatibility mode for each subject
148    compatibility_modes: Arc<DashMap<String, CompatibilityMode>>,
149
150    /// Configuration
151    config: SchemaRegistryConfig,
152
153    /// Statistics
154    stats: Arc<RwLock<SchemaRegistryStats>>,
155}
156
157impl SchemaRegistry {
158    pub fn new(config: SchemaRegistryConfig) -> Self {
159        Self {
160            schemas: Arc::new(DashMap::new()),
161            latest_versions: Arc::new(DashMap::new()),
162            compatibility_modes: Arc::new(DashMap::new()),
163            config,
164            stats: Arc::new(RwLock::new(SchemaRegistryStats {
165                total_schemas: 0,
166                total_subjects: 0,
167                validations_performed: 0,
168                validation_failures: 0,
169            })),
170        }
171    }
172
173    /// Register a new schema or return existing if identical
174    pub fn register_schema(
175        &self,
176        subject: String,
177        schema: JsonValue,
178        description: Option<String>,
179        tags: Option<Vec<String>>,
180    ) -> Result<RegisterSchemaResponse> {
181        // Determine next version
182        let next_version = self
183            .latest_versions
184            .get(&subject)
185            .map(|v| *v + 1)
186            .unwrap_or(1);
187
188        // Check compatibility with previous version if it exists
189        if next_version > 1 {
190            let prev_version = next_version - 1;
191            if let Some(subject_schemas) = self.schemas.get(&subject)
192                && let Some(prev_schema) = subject_schemas.get(&prev_version)
193            {
194                let compatibility = self.get_compatibility_mode(&subject);
195                let check_result =
196                    self.check_compatibility(&prev_schema.schema, &schema, compatibility)?;
197
198                if !check_result.compatible {
199                    return Err(AllSourceError::ValidationError(format!(
200                        "Schema compatibility check failed: {}",
201                        check_result.issues.join(", ")
202                    )));
203                }
204            }
205        }
206
207        // Create and store the schema
208        let mut new_schema = Schema::new(subject.clone(), next_version, schema);
209        new_schema.description = description;
210        new_schema.tags = tags.unwrap_or_default();
211
212        let schema_id = new_schema.id;
213        let created_at = new_schema.created_at;
214
215        // Get or create subject entry and insert the schema
216        self.schemas
217            .entry(subject.clone())
218            .or_default()
219            .insert(next_version, new_schema);
220        self.latest_versions.insert(subject.clone(), next_version);
221
222        // Update stats
223        let mut stats = self.stats.write();
224        stats.total_schemas += 1;
225        if next_version == 1 {
226            stats.total_subjects += 1;
227        }
228
229        tracing::info!(
230            "📋 Registered schema v{} for subject '{}' (ID: {})",
231            next_version,
232            subject,
233            schema_id
234        );
235
236        Ok(RegisterSchemaResponse {
237            schema_id,
238            subject,
239            version: next_version,
240            created_at,
241        })
242    }
243
244    /// Get a schema by subject and version (or latest if no version specified)
245    pub fn get_schema(&self, subject: &str, version: Option<u32>) -> Result<Schema> {
246        let subject_schemas = self.schemas.get(subject).ok_or_else(|| {
247            AllSourceError::ValidationError(format!("Subject not found: {subject}"))
248        })?;
249
250        let version = match version {
251            Some(v) => v,
252            None => *self.latest_versions.get(subject).ok_or_else(|| {
253                AllSourceError::ValidationError(format!("No versions for subject: {subject}"))
254            })?,
255        };
256
257        subject_schemas.get(&version).cloned().ok_or_else(|| {
258            AllSourceError::ValidationError(format!(
259                "Schema version {} not found for subject: {}",
260                version, subject
261            ))
262        })
263    }
264
265    /// List all versions of a schema subject
266    pub fn list_versions(&self, subject: &str) -> Result<Vec<u32>> {
267        let subject_schemas = self.schemas.get(subject).ok_or_else(|| {
268            AllSourceError::ValidationError(format!("Subject not found: {subject}"))
269        })?;
270
271        let mut versions: Vec<u32> = subject_schemas.keys().copied().collect();
272        versions.sort_unstable();
273
274        Ok(versions)
275    }
276
277    /// List all schema subjects
278    pub fn list_subjects(&self) -> Vec<String> {
279        self.schemas
280            .iter()
281            .map(|entry| entry.key().clone())
282            .collect()
283    }
284
285    /// Validate a payload against a schema
286    pub fn validate(
287        &self,
288        subject: &str,
289        version: Option<u32>,
290        payload: &JsonValue,
291    ) -> Result<ValidateEventResponse> {
292        let schema = self.get_schema(subject, version)?;
293
294        let validation_result = Self::validate_json(payload, &schema.schema);
295
296        // Update stats
297        let mut stats = self.stats.write();
298        stats.validations_performed += 1;
299        if !validation_result.is_empty() {
300            stats.validation_failures += 1;
301        }
302
303        Ok(ValidateEventResponse {
304            valid: validation_result.is_empty(),
305            errors: validation_result,
306            schema_version: schema.version,
307        })
308    }
309
310    /// Internal JSON Schema validation
311    fn validate_json(data: &JsonValue, schema: &JsonValue) -> Vec<String> {
312        let mut errors = Vec::new();
313
314        // Basic JSON Schema validation
315        // In production, use jsonschema crate, but implementing basic checks here
316
317        // Check required fields
318        if let Some(required) = schema.get("required").and_then(|r| r.as_array())
319            && let Some(obj) = data.as_object()
320        {
321            for req_field in required {
322                if let Some(field_name) = req_field.as_str()
323                    && !obj.contains_key(field_name)
324                {
325                    errors.push(format!("Missing required field: {field_name}"));
326                }
327            }
328        }
329
330        // Check type
331        if let Some(expected_type) = schema.get("type").and_then(|t| t.as_str()) {
332            let actual_type = match data {
333                JsonValue::Null => "null",
334                JsonValue::Bool(_) => "boolean",
335                JsonValue::Number(_) => "number",
336                JsonValue::String(_) => "string",
337                JsonValue::Array(_) => "array",
338                JsonValue::Object(_) => "object",
339            };
340
341            if expected_type != actual_type {
342                errors.push(format!(
343                    "Type mismatch: expected {}, got {}",
344                    expected_type, actual_type
345                ));
346            }
347        }
348
349        // Check properties
350        if let (Some(properties), Some(data_obj)) = (
351            schema.get("properties").and_then(|p| p.as_object()),
352            data.as_object(),
353        ) {
354            for (key, value) in data_obj {
355                if let Some(prop_schema) = properties.get(key) {
356                    let nested_errors = Self::validate_json(value, prop_schema);
357                    for err in nested_errors {
358                        errors.push(format!("{key}.{err}"));
359                    }
360                }
361            }
362        }
363
364        errors
365    }
366
367    /// Check compatibility between two schemas
368    fn check_compatibility(
369        &self,
370        old_schema: &JsonValue,
371        new_schema: &JsonValue,
372        mode: CompatibilityMode,
373    ) -> Result<CompatibilityCheckResult> {
374        let mut issues = Vec::new();
375
376        match mode {
377            CompatibilityMode::None => {
378                return Ok(CompatibilityCheckResult {
379                    compatible: true,
380                    compatibility_mode: mode,
381                    issues: Vec::new(),
382                });
383            }
384            CompatibilityMode::Backward => {
385                // Check that all old required fields are still required
386                issues.extend(self.check_backward_compatibility(old_schema, new_schema));
387            }
388            CompatibilityMode::Forward => {
389                // Check that all new required fields were in old schema
390                issues.extend(self.check_forward_compatibility(old_schema, new_schema));
391            }
392            CompatibilityMode::Full => {
393                // Check both directions
394                issues.extend(self.check_backward_compatibility(old_schema, new_schema));
395                issues.extend(self.check_forward_compatibility(old_schema, new_schema));
396            }
397        }
398
399        Ok(CompatibilityCheckResult {
400            compatible: issues.is_empty(),
401            compatibility_mode: mode,
402            issues,
403        })
404    }
405
406    fn check_backward_compatibility(
407        &self,
408        old_schema: &JsonValue,
409        new_schema: &JsonValue,
410    ) -> Vec<String> {
411        let mut issues = Vec::new();
412
413        // Get required fields from old schema
414        if let Some(old_required) = old_schema.get("required").and_then(|r| r.as_array()) {
415            let new_required = new_schema
416                .get("required")
417                .and_then(|r| r.as_array())
418                .map(|arr| arr.iter().filter_map(|v| v.as_str()).collect::<Vec<_>>())
419                .unwrap_or_default();
420
421            for old_req in old_required {
422                if let Some(field_name) = old_req.as_str()
423                    && !new_required.contains(&field_name)
424                {
425                    issues.push(format!(
426                        "Backward compatibility: required field '{}' removed",
427                        field_name
428                    ));
429                }
430            }
431        }
432
433        issues
434    }
435
436    fn check_forward_compatibility(
437        &self,
438        old_schema: &JsonValue,
439        new_schema: &JsonValue,
440    ) -> Vec<String> {
441        let mut issues = Vec::new();
442
443        // Get required fields from new schema
444        if let Some(new_required) = new_schema.get("required").and_then(|r| r.as_array()) {
445            let old_required = old_schema
446                .get("required")
447                .and_then(|r| r.as_array())
448                .map(|arr| arr.iter().filter_map(|v| v.as_str()).collect::<Vec<_>>())
449                .unwrap_or_default();
450
451            for new_req in new_required {
452                if let Some(field_name) = new_req.as_str()
453                    && !old_required.contains(&field_name)
454                {
455                    issues.push(format!(
456                        "Forward compatibility: new required field '{}' added",
457                        field_name
458                    ));
459                }
460            }
461        }
462
463        issues
464    }
465
466    /// Set compatibility mode for a subject
467    pub fn set_compatibility_mode(&self, subject: String, mode: CompatibilityMode) {
468        self.compatibility_modes.insert(subject, mode);
469    }
470
471    /// Get compatibility mode for a subject (or default)
472    pub fn get_compatibility_mode(&self, subject: &str) -> CompatibilityMode {
473        self.compatibility_modes
474            .get(subject)
475            .map(|entry| *entry.value())
476            .unwrap_or(self.config.default_compatibility)
477    }
478
479    /// Delete a specific schema version
480    pub fn delete_schema(&self, subject: &str, version: u32) -> Result<bool> {
481        if let Some(mut subject_schemas) = self.schemas.get_mut(subject)
482            && subject_schemas.remove(&version).is_some()
483        {
484            tracing::info!("🗑️  Deleted schema v{} for subject '{}'", version, subject);
485
486            // Update stats
487            let mut stats = self.stats.write();
488            stats.total_schemas = stats.total_schemas.saturating_sub(1);
489
490            return Ok(true);
491        }
492
493        Ok(false)
494    }
495
496    /// Get registry statistics
497    pub fn stats(&self) -> SchemaRegistryStats {
498        self.stats.read().clone()
499    }
500
501    /// Get registry configuration
502    pub fn config(&self) -> &SchemaRegistryConfig {
503        &self.config
504    }
505}
506
507#[cfg(test)]
508mod tests {
509    use super::*;
510    use serde_json::json;
511
512    #[test]
513    fn test_schema_registration() {
514        let registry = SchemaRegistry::new(SchemaRegistryConfig::default());
515
516        let schema = json!({
517            "type": "object",
518            "properties": {
519                "user_id": {"type": "string"},
520                "email": {"type": "string"}
521            },
522            "required": ["user_id", "email"]
523        });
524
525        let response = registry
526            .register_schema(
527                "user.created".to_string(),
528                schema,
529                Some("User creation event".to_string()),
530                None,
531            )
532            .unwrap();
533
534        assert_eq!(response.version, 1);
535        assert_eq!(response.subject, "user.created");
536    }
537
538    #[test]
539    fn test_schema_validation() {
540        let registry = SchemaRegistry::new(SchemaRegistryConfig::default());
541
542        let schema = json!({
543            "type": "object",
544            "properties": {
545                "user_id": {"type": "string"},
546                "email": {"type": "string"}
547            },
548            "required": ["user_id", "email"]
549        });
550
551        registry
552            .register_schema("user.created".to_string(), schema, None, None)
553            .unwrap();
554
555        // Valid payload
556        let valid_payload = json!({
557            "user_id": "123",
558            "email": "test@example.com"
559        });
560
561        let result = registry
562            .validate("user.created", None, &valid_payload)
563            .unwrap();
564        assert!(result.valid);
565
566        // Invalid payload (missing required field)
567        let invalid_payload = json!({
568            "user_id": "123"
569        });
570
571        let result = registry
572            .validate("user.created", None, &invalid_payload)
573            .unwrap();
574        assert!(!result.valid);
575        assert!(!result.errors.is_empty());
576    }
577
578    #[test]
579    fn test_backward_compatibility() {
580        let registry = SchemaRegistry::new(SchemaRegistryConfig {
581            default_compatibility: CompatibilityMode::Backward,
582            ..Default::default()
583        });
584
585        let schema_v1 = json!({
586            "type": "object",
587            "required": ["user_id", "email"]
588        });
589
590        registry
591            .register_schema("user.created".to_string(), schema_v1, None, None)
592            .unwrap();
593
594        // Compatible: adding optional field
595        let schema_v2 = json!({
596            "type": "object",
597            "required": ["user_id", "email"]
598        });
599
600        let result = registry.register_schema("user.created".to_string(), schema_v2, None, None);
601        assert!(result.is_ok());
602
603        // Incompatible: removing required field
604        let schema_v3 = json!({
605            "type": "object",
606            "required": ["user_id"]
607        });
608
609        let result = registry.register_schema("user.created".to_string(), schema_v3, None, None);
610        assert!(result.is_err());
611    }
612}