Skip to main content

allsource_core/application/services/
schema.rs

1use crate::error::{AllSourceError, Result};
2use chrono::{DateTime, Utc};
3use dashmap::DashMap;
4use parking_lot::RwLock;
5use serde::{Deserialize, Serialize};
6use serde_json::Value as JsonValue;
7use std::{collections::HashMap, sync::Arc};
8use uuid::Uuid;
9
10/// Compatibility mode for schema evolution
11#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq)]
12#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
13pub enum CompatibilityMode {
14    /// No compatibility checking
15    None,
16    /// New schema must be backward compatible (new fields optional)
17    #[default]
18    Backward,
19    /// New schema must be forward compatible (old fields preserved)
20    Forward,
21    /// New schema must be both backward and forward compatible
22    Full,
23}
24
25/// Schema definition with versioning
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct Schema {
28    /// Unique schema ID
29    pub id: Uuid,
30
31    /// Subject/topic name (e.g., "user.created", "order.placed")
32    pub subject: String,
33
34    /// Schema version number
35    pub version: u32,
36
37    /// JSON Schema definition
38    pub schema: JsonValue,
39
40    /// When this schema was registered
41    pub created_at: DateTime<Utc>,
42
43    /// Schema description/documentation
44    pub description: Option<String>,
45
46    /// Tags for organization
47    pub tags: Vec<String>,
48}
49
50impl Schema {
51    pub fn new(subject: String, version: u32, schema: JsonValue) -> Self {
52        Self {
53            id: Uuid::new_v4(),
54            subject,
55            version,
56            schema,
57            created_at: Utc::now(),
58            description: None,
59            tags: Vec::new(),
60        }
61    }
62}
63
64/// Request to register a new schema
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct RegisterSchemaRequest {
67    pub subject: String,
68    pub schema: JsonValue,
69    pub description: Option<String>,
70    pub tags: Option<Vec<String>>,
71}
72
73/// Response from schema registration
74#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct RegisterSchemaResponse {
76    pub schema_id: Uuid,
77    pub subject: String,
78    pub version: u32,
79    pub created_at: DateTime<Utc>,
80}
81
82/// Request to validate an event against a schema
83#[derive(Debug, Clone, Serialize, Deserialize)]
84pub struct ValidateEventRequest {
85    pub subject: String,
86    pub version: Option<u32>,
87    pub payload: JsonValue,
88}
89
90/// Response from event validation
91#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct ValidateEventResponse {
93    pub valid: bool,
94    pub errors: Vec<String>,
95    pub schema_version: u32,
96}
97
98/// Schema compatibility check result
99#[derive(Debug, Clone, Serialize, Deserialize)]
100pub struct CompatibilityCheckResult {
101    pub compatible: bool,
102    pub compatibility_mode: CompatibilityMode,
103    pub issues: Vec<String>,
104}
105
106/// Statistics about the schema registry
107#[derive(Debug, Clone, Serialize, Deserialize)]
108pub struct SchemaRegistryStats {
109    pub total_schemas: usize,
110    pub total_subjects: usize,
111    pub validations_performed: u64,
112    pub validation_failures: u64,
113}
114
115/// Configuration for the schema registry
116#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct SchemaRegistryConfig {
118    /// Default compatibility mode
119    pub default_compatibility: CompatibilityMode,
120
121    /// Whether to auto-register schemas on first use
122    pub auto_register: bool,
123
124    /// Whether to enforce schema validation on ingestion
125    pub enforce_validation: bool,
126}
127
128impl Default for SchemaRegistryConfig {
129    fn default() -> Self {
130        Self {
131            default_compatibility: CompatibilityMode::Backward,
132            auto_register: false,
133            enforce_validation: false,
134        }
135    }
136}
137
138/// Central registry for managing event schemas
139pub struct SchemaRegistry {
140    /// Schemas organized by subject and version - using DashMap for lock-free concurrent access
141    /// Key: subject -> version -> Schema
142    schemas: Arc<DashMap<String, HashMap<u32, Schema>>>,
143
144    /// Latest version for each subject
145    latest_versions: Arc<DashMap<String, u32>>,
146
147    /// Compatibility mode for each subject
148    compatibility_modes: Arc<DashMap<String, CompatibilityMode>>,
149
150    /// Configuration
151    config: SchemaRegistryConfig,
152
153    /// Statistics
154    stats: Arc<RwLock<SchemaRegistryStats>>,
155}
156
157impl SchemaRegistry {
158    pub fn new(config: SchemaRegistryConfig) -> Self {
159        Self {
160            schemas: Arc::new(DashMap::new()),
161            latest_versions: Arc::new(DashMap::new()),
162            compatibility_modes: Arc::new(DashMap::new()),
163            config,
164            stats: Arc::new(RwLock::new(SchemaRegistryStats {
165                total_schemas: 0,
166                total_subjects: 0,
167                validations_performed: 0,
168                validation_failures: 0,
169            })),
170        }
171    }
172
173    /// Register a new schema or return existing if identical
174    pub fn register_schema(
175        &self,
176        subject: String,
177        schema: JsonValue,
178        description: Option<String>,
179        tags: Option<Vec<String>>,
180    ) -> Result<RegisterSchemaResponse> {
181        // Determine next version
182        let next_version = self.latest_versions.get(&subject).map_or(1, |v| *v + 1);
183
184        // Check compatibility with previous version if it exists
185        if next_version > 1 {
186            let prev_version = next_version - 1;
187            if let Some(subject_schemas) = self.schemas.get(&subject)
188                && let Some(prev_schema) = subject_schemas.get(&prev_version)
189            {
190                let compatibility = self.get_compatibility_mode(&subject);
191                let check_result =
192                    self.check_compatibility(&prev_schema.schema, &schema, compatibility)?;
193
194                if !check_result.compatible {
195                    return Err(AllSourceError::ValidationError(format!(
196                        "Schema compatibility check failed: {}",
197                        check_result.issues.join(", ")
198                    )));
199                }
200            }
201        }
202
203        // Create and store the schema
204        let mut new_schema = Schema::new(subject.clone(), next_version, schema);
205        new_schema.description = description;
206        new_schema.tags = tags.unwrap_or_default();
207
208        let schema_id = new_schema.id;
209        let created_at = new_schema.created_at;
210
211        // Get or create subject entry and insert the schema
212        self.schemas
213            .entry(subject.clone())
214            .or_default()
215            .insert(next_version, new_schema);
216        self.latest_versions.insert(subject.clone(), next_version);
217
218        // Update stats
219        let mut stats = self.stats.write();
220        stats.total_schemas += 1;
221        if next_version == 1 {
222            stats.total_subjects += 1;
223        }
224
225        tracing::info!(
226            "📋 Registered schema v{} for subject '{}' (ID: {})",
227            next_version,
228            subject,
229            schema_id
230        );
231
232        Ok(RegisterSchemaResponse {
233            schema_id,
234            subject,
235            version: next_version,
236            created_at,
237        })
238    }
239
240    /// Get a schema by subject and version (or latest if no version specified)
241    pub fn get_schema(&self, subject: &str, version: Option<u32>) -> Result<Schema> {
242        let subject_schemas = self.schemas.get(subject).ok_or_else(|| {
243            AllSourceError::ValidationError(format!("Subject not found: {subject}"))
244        })?;
245
246        let version = match version {
247            Some(v) => v,
248            None => *self.latest_versions.get(subject).ok_or_else(|| {
249                AllSourceError::ValidationError(format!("No versions for subject: {subject}"))
250            })?,
251        };
252
253        subject_schemas.get(&version).cloned().ok_or_else(|| {
254            AllSourceError::ValidationError(format!(
255                "Schema version {version} not found for subject: {subject}"
256            ))
257        })
258    }
259
260    /// List all versions of a schema subject
261    pub fn list_versions(&self, subject: &str) -> Result<Vec<u32>> {
262        let subject_schemas = self.schemas.get(subject).ok_or_else(|| {
263            AllSourceError::ValidationError(format!("Subject not found: {subject}"))
264        })?;
265
266        let mut versions: Vec<u32> = subject_schemas.keys().copied().collect();
267        versions.sort_unstable();
268
269        Ok(versions)
270    }
271
272    /// List all schema subjects
273    pub fn list_subjects(&self) -> Vec<String> {
274        self.schemas
275            .iter()
276            .map(|entry| entry.key().clone())
277            .collect()
278    }
279
280    /// Validate a payload against a schema
281    pub fn validate(
282        &self,
283        subject: &str,
284        version: Option<u32>,
285        payload: &JsonValue,
286    ) -> Result<ValidateEventResponse> {
287        let schema = self.get_schema(subject, version)?;
288
289        let validation_result = Self::validate_json(payload, &schema.schema);
290
291        // Update stats
292        let mut stats = self.stats.write();
293        stats.validations_performed += 1;
294        if !validation_result.is_empty() {
295            stats.validation_failures += 1;
296        }
297
298        Ok(ValidateEventResponse {
299            valid: validation_result.is_empty(),
300            errors: validation_result,
301            schema_version: schema.version,
302        })
303    }
304
305    /// Internal JSON Schema validation
306    fn validate_json(data: &JsonValue, schema: &JsonValue) -> Vec<String> {
307        let mut errors = Vec::new();
308
309        // Basic JSON Schema validation
310        // In production, use jsonschema crate, but implementing basic checks here
311
312        // Check required fields
313        if let Some(required) = schema.get("required").and_then(|r| r.as_array())
314            && let Some(obj) = data.as_object()
315        {
316            for req_field in required {
317                if let Some(field_name) = req_field.as_str()
318                    && !obj.contains_key(field_name)
319                {
320                    errors.push(format!("Missing required field: {field_name}"));
321                }
322            }
323        }
324
325        // Check type
326        if let Some(expected_type) = schema.get("type").and_then(|t| t.as_str()) {
327            let actual_type = match data {
328                JsonValue::Null => "null",
329                JsonValue::Bool(_) => "boolean",
330                JsonValue::Number(_) => "number",
331                JsonValue::String(_) => "string",
332                JsonValue::Array(_) => "array",
333                JsonValue::Object(_) => "object",
334            };
335
336            if expected_type != actual_type {
337                errors.push(format!(
338                    "Type mismatch: expected {expected_type}, got {actual_type}"
339                ));
340            }
341        }
342
343        // Check properties
344        if let (Some(properties), Some(data_obj)) = (
345            schema.get("properties").and_then(|p| p.as_object()),
346            data.as_object(),
347        ) {
348            for (key, value) in data_obj {
349                if let Some(prop_schema) = properties.get(key) {
350                    let nested_errors = Self::validate_json(value, prop_schema);
351                    for err in nested_errors {
352                        errors.push(format!("{key}.{err}"));
353                    }
354                }
355            }
356        }
357
358        errors
359    }
360
361    /// Check compatibility between two schemas
362    fn check_compatibility(
363        &self,
364        old_schema: &JsonValue,
365        new_schema: &JsonValue,
366        mode: CompatibilityMode,
367    ) -> Result<CompatibilityCheckResult> {
368        let mut issues = Vec::new();
369
370        match mode {
371            CompatibilityMode::None => {
372                return Ok(CompatibilityCheckResult {
373                    compatible: true,
374                    compatibility_mode: mode,
375                    issues: Vec::new(),
376                });
377            }
378            CompatibilityMode::Backward => {
379                // Check that all old required fields are still required
380                issues.extend(self.check_backward_compatibility(old_schema, new_schema));
381            }
382            CompatibilityMode::Forward => {
383                // Check that all new required fields were in old schema
384                issues.extend(self.check_forward_compatibility(old_schema, new_schema));
385            }
386            CompatibilityMode::Full => {
387                // Check both directions
388                issues.extend(self.check_backward_compatibility(old_schema, new_schema));
389                issues.extend(self.check_forward_compatibility(old_schema, new_schema));
390            }
391        }
392
393        Ok(CompatibilityCheckResult {
394            compatible: issues.is_empty(),
395            compatibility_mode: mode,
396            issues,
397        })
398    }
399
400    fn check_backward_compatibility(
401        &self,
402        old_schema: &JsonValue,
403        new_schema: &JsonValue,
404    ) -> Vec<String> {
405        let mut issues = Vec::new();
406
407        // Get required fields from old schema
408        if let Some(old_required) = old_schema.get("required").and_then(|r| r.as_array()) {
409            let new_required = new_schema
410                .get("required")
411                .and_then(|r| r.as_array())
412                .map(|arr| arr.iter().filter_map(|v| v.as_str()).collect::<Vec<_>>())
413                .unwrap_or_default();
414
415            for old_req in old_required {
416                if let Some(field_name) = old_req.as_str()
417                    && !new_required.contains(&field_name)
418                {
419                    issues.push(format!(
420                        "Backward compatibility: required field '{field_name}' removed"
421                    ));
422                }
423            }
424        }
425
426        issues
427    }
428
429    fn check_forward_compatibility(
430        &self,
431        old_schema: &JsonValue,
432        new_schema: &JsonValue,
433    ) -> Vec<String> {
434        let mut issues = Vec::new();
435
436        // Get required fields from new schema
437        if let Some(new_required) = new_schema.get("required").and_then(|r| r.as_array()) {
438            let old_required = old_schema
439                .get("required")
440                .and_then(|r| r.as_array())
441                .map(|arr| arr.iter().filter_map(|v| v.as_str()).collect::<Vec<_>>())
442                .unwrap_or_default();
443
444            for new_req in new_required {
445                if let Some(field_name) = new_req.as_str()
446                    && !old_required.contains(&field_name)
447                {
448                    issues.push(format!(
449                        "Forward compatibility: new required field '{field_name}' added"
450                    ));
451                }
452            }
453        }
454
455        issues
456    }
457
458    /// Set compatibility mode for a subject
459    pub fn set_compatibility_mode(&self, subject: String, mode: CompatibilityMode) {
460        self.compatibility_modes.insert(subject, mode);
461    }
462
463    /// Get compatibility mode for a subject (or default)
464    pub fn get_compatibility_mode(&self, subject: &str) -> CompatibilityMode {
465        self.compatibility_modes
466            .get(subject)
467            .map_or(self.config.default_compatibility, |entry| *entry.value())
468    }
469
470    /// Delete a specific schema version
471    pub fn delete_schema(&self, subject: &str, version: u32) -> Result<bool> {
472        if let Some(mut subject_schemas) = self.schemas.get_mut(subject)
473            && subject_schemas.remove(&version).is_some()
474        {
475            tracing::info!("🗑️  Deleted schema v{} for subject '{}'", version, subject);
476
477            // Update stats
478            let mut stats = self.stats.write();
479            stats.total_schemas = stats.total_schemas.saturating_sub(1);
480
481            return Ok(true);
482        }
483
484        Ok(false)
485    }
486
487    /// Get registry statistics
488    pub fn stats(&self) -> SchemaRegistryStats {
489        self.stats.read().clone()
490    }
491
492    /// Get registry configuration
493    pub fn config(&self) -> &SchemaRegistryConfig {
494        &self.config
495    }
496}
497
498#[cfg(test)]
499mod tests {
500    use super::*;
501    use serde_json::json;
502
503    #[test]
504    fn test_schema_registration() {
505        let registry = SchemaRegistry::new(SchemaRegistryConfig::default());
506
507        let schema = json!({
508            "type": "object",
509            "properties": {
510                "user_id": {"type": "string"},
511                "email": {"type": "string"}
512            },
513            "required": ["user_id", "email"]
514        });
515
516        let response = registry
517            .register_schema(
518                "user.created".to_string(),
519                schema,
520                Some("User creation event".to_string()),
521                None,
522            )
523            .unwrap();
524
525        assert_eq!(response.version, 1);
526        assert_eq!(response.subject, "user.created");
527    }
528
529    #[test]
530    fn test_schema_validation() {
531        let registry = SchemaRegistry::new(SchemaRegistryConfig::default());
532
533        let schema = json!({
534            "type": "object",
535            "properties": {
536                "user_id": {"type": "string"},
537                "email": {"type": "string"}
538            },
539            "required": ["user_id", "email"]
540        });
541
542        registry
543            .register_schema("user.created".to_string(), schema, None, None)
544            .unwrap();
545
546        // Valid payload
547        let valid_payload = json!({
548            "user_id": "123",
549            "email": "test@example.com"
550        });
551
552        let result = registry
553            .validate("user.created", None, &valid_payload)
554            .unwrap();
555        assert!(result.valid);
556
557        // Invalid payload (missing required field)
558        let invalid_payload = json!({
559            "user_id": "123"
560        });
561
562        let result = registry
563            .validate("user.created", None, &invalid_payload)
564            .unwrap();
565        assert!(!result.valid);
566        assert!(!result.errors.is_empty());
567    }
568
569    #[test]
570    fn test_backward_compatibility() {
571        let registry = SchemaRegistry::new(SchemaRegistryConfig {
572            default_compatibility: CompatibilityMode::Backward,
573            ..Default::default()
574        });
575
576        let schema_v1 = json!({
577            "type": "object",
578            "required": ["user_id", "email"]
579        });
580
581        registry
582            .register_schema("user.created".to_string(), schema_v1, None, None)
583            .unwrap();
584
585        // Compatible: adding optional field
586        let schema_v2 = json!({
587            "type": "object",
588            "required": ["user_id", "email"]
589        });
590
591        let result = registry.register_schema("user.created".to_string(), schema_v2, None, None);
592        assert!(result.is_ok());
593
594        // Incompatible: removing required field
595        let schema_v3 = json!({
596            "type": "object",
597            "required": ["user_id"]
598        });
599
600        let result = registry.register_schema("user.created".to_string(), schema_v3, None, None);
601        assert!(result.is_err());
602    }
603}