Skip to main content

allsource_core/application/services/
schema_evolution.rs

1//! Autonomous schema evolution
2//!
3//! Automatically detects schema changes from ingested events and manages
4//! schema evolution without manual intervention. Works alongside the existing
5//! `SchemaRegistry` to:
6//!
7//! 1. **Infer schemas** from event payloads on first ingestion of a new event type
8//! 2. **Detect drift** when new events have fields not in the registered schema
9//! 3. **Auto-register** backward-compatible schema versions (new optional fields)
10//! 4. **Flag breaking changes** that require manual review (type changes, removed required fields)
11//!
12//! ## How it works
13//!
14//! On each event ingestion, the `SchemaEvolutionManager` compares the event's
15//! payload structure against the latest registered schema for that event type.
16//! If differences are detected:
17//! - New optional fields → auto-registered as a new schema version
18//! - Type changes or missing required fields → flagged as breaking, not auto-registered
19//!
20//! ## Limitations
21//!
22//! - Only analyzes top-level and one level of nested fields
23//! - Type inference is basic: string, number, boolean, array, object, null
24//! - Does not handle polymorphic types (field sometimes string, sometimes number)
25//! - No automatic data migration — schemas describe shape, don't transform data
26
27use chrono::{DateTime, Utc};
28use dashmap::DashMap;
29use serde::{Deserialize, Serialize};
30use serde_json::Value as JsonValue;
31use std::collections::{BTreeMap, BTreeSet};
32use uuid::Uuid;
33
34/// Schema evolution event types
35#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
36pub enum EvolutionAction {
37    /// Schema was inferred from first event of this type
38    Inferred,
39    /// Backward-compatible change auto-registered
40    AutoEvolved,
41    /// Breaking change detected, requires manual review
42    BreakingChangeDetected,
43}
44
45/// A record of a schema evolution event
46#[derive(Debug, Clone, Serialize)]
47pub struct EvolutionRecord {
48    pub id: Uuid,
49    pub event_type: String,
50    pub action: EvolutionAction,
51    pub from_version: Option<u32>,
52    pub to_version: Option<u32>,
53    pub added_fields: Vec<String>,
54    pub removed_fields: Vec<String>,
55    pub type_changes: Vec<FieldTypeChange>,
56    pub timestamp: DateTime<Utc>,
57}
58
59/// A field type change (potential breaking change)
60#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
61pub struct FieldTypeChange {
62    pub field: String,
63    pub old_type: String,
64    pub new_type: String,
65}
66
67/// Inferred JSON type for a field
68#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
69pub enum InferredType {
70    String,
71    Number,
72    Boolean,
73    Array,
74    Object,
75    Null,
76}
77
78impl std::fmt::Display for InferredType {
79    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80        match self {
81            InferredType::String => write!(f, "string"),
82            InferredType::Number => write!(f, "number"),
83            InferredType::Boolean => write!(f, "boolean"),
84            InferredType::Array => write!(f, "array"),
85            InferredType::Object => write!(f, "object"),
86            InferredType::Null => write!(f, "null"),
87        }
88    }
89}
90
91/// Infer the type of a JSON value
92fn infer_type(value: &JsonValue) -> InferredType {
93    match value {
94        JsonValue::Null => InferredType::Null,
95        JsonValue::Bool(_) => InferredType::Boolean,
96        JsonValue::Number(_) => InferredType::Number,
97        JsonValue::String(_) => InferredType::String,
98        JsonValue::Array(_) => InferredType::Array,
99        JsonValue::Object(_) => InferredType::Object,
100    }
101}
102
103/// An inferred field schema
104#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct FieldSchema {
106    pub name: String,
107    pub inferred_type: InferredType,
108    pub nullable: bool,
109    /// Nested fields (only if type is Object)
110    pub nested: BTreeMap<String, FieldSchema>,
111}
112
113/// Schema shape inferred from a payload
114#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct InferredSchema {
116    pub fields: BTreeMap<String, FieldSchema>,
117}
118
119/// Diff between two schemas
120#[derive(Debug, Clone, Serialize)]
121pub struct SchemaDiff {
122    pub added_fields: Vec<String>,
123    pub removed_fields: Vec<String>,
124    pub type_changes: Vec<FieldTypeChange>,
125    pub is_backward_compatible: bool,
126}
127
128/// Infer a schema shape from a JSON payload
129pub fn infer_schema(payload: &JsonValue) -> InferredSchema {
130    let mut fields = BTreeMap::new();
131    if let Some(obj) = payload.as_object() {
132        for (key, value) in obj {
133            let mut nested = BTreeMap::new();
134            if let Some(inner_obj) = value.as_object() {
135                for (nk, nv) in inner_obj {
136                    nested.insert(
137                        nk.clone(),
138                        FieldSchema {
139                            name: nk.clone(),
140                            inferred_type: infer_type(nv),
141                            nullable: nv.is_null(),
142                            nested: BTreeMap::new(),
143                        },
144                    );
145                }
146            }
147            fields.insert(
148                key.clone(),
149                FieldSchema {
150                    name: key.clone(),
151                    inferred_type: infer_type(value),
152                    nullable: value.is_null(),
153                    nested,
154                },
155            );
156        }
157    }
158    InferredSchema { fields }
159}
160
161/// Convert an inferred schema to a JSON Schema document
162pub fn to_json_schema(schema: &InferredSchema) -> JsonValue {
163    let mut properties = serde_json::Map::new();
164    let mut required = Vec::new();
165
166    for (name, field) in &schema.fields {
167        let type_str = match field.inferred_type {
168            InferredType::String => "string",
169            InferredType::Number => "number",
170            InferredType::Boolean => "boolean",
171            InferredType::Array => "array",
172            InferredType::Object => "object",
173            InferredType::Null => "null",
174        };
175
176        let mut prop = serde_json::json!({"type": type_str});
177
178        // Add nested properties for objects
179        if field.inferred_type == InferredType::Object && !field.nested.is_empty() {
180            let nested_schema = InferredSchema {
181                fields: field.nested.clone(),
182            };
183            let nested_json = to_json_schema(&nested_schema);
184            if let Some(np) = nested_json.get("properties") {
185                prop["properties"] = np.clone();
186            }
187        }
188
189        properties.insert(name.clone(), prop);
190        if !field.nullable {
191            required.push(JsonValue::String(name.clone()));
192        }
193    }
194
195    serde_json::json!({
196        "type": "object",
197        "properties": properties,
198        "required": required,
199    })
200}
201
202/// Compute the diff between an existing schema and a new payload's inferred schema
203pub fn compute_diff(existing: &InferredSchema, new: &InferredSchema) -> SchemaDiff {
204    let existing_keys: BTreeSet<&String> = existing.fields.keys().collect();
205    let new_keys: BTreeSet<&String> = new.fields.keys().collect();
206
207    let added: Vec<String> = new_keys
208        .difference(&existing_keys)
209        .map(|k| (*k).clone())
210        .collect();
211    let removed: Vec<String> = existing_keys
212        .difference(&new_keys)
213        .map(|k| (*k).clone())
214        .collect();
215
216    let mut type_changes = Vec::new();
217    for key in existing_keys.intersection(&new_keys) {
218        let old_field = &existing.fields[*key];
219        let new_field = &new.fields[*key];
220        if old_field.inferred_type != new_field.inferred_type
221            && old_field.inferred_type != InferredType::Null
222            && new_field.inferred_type != InferredType::Null
223        {
224            type_changes.push(FieldTypeChange {
225                field: (*key).clone(),
226                old_type: old_field.inferred_type.to_string(),
227                new_type: new_field.inferred_type.to_string(),
228            });
229        }
230    }
231
232    // Backward compatible = no type changes, no removed required fields
233    let removed_required = removed
234        .iter()
235        .any(|r| existing.fields.get(r).is_some_and(|f| !f.nullable));
236
237    let is_backward_compatible = type_changes.is_empty() && !removed_required;
238
239    SchemaDiff {
240        added_fields: added,
241        removed_fields: removed,
242        type_changes,
243        is_backward_compatible,
244    }
245}
246
247/// Schema evolution manager — tracks inferred schemas and evolution history
248pub struct SchemaEvolutionManager {
249    /// Latest inferred schema per event type
250    schemas: DashMap<String, InferredSchema>,
251    /// Evolution history
252    history: DashMap<String, Vec<EvolutionRecord>>,
253    /// Version counter per event type
254    versions: DashMap<String, u32>,
255}
256
257impl Default for SchemaEvolutionManager {
258    fn default() -> Self {
259        Self::new()
260    }
261}
262
263impl SchemaEvolutionManager {
264    pub fn new() -> Self {
265        Self {
266            schemas: DashMap::new(),
267            history: DashMap::new(),
268            versions: DashMap::new(),
269        }
270    }
271
272    /// Analyze an event and evolve the schema if needed
273    ///
274    /// Returns the evolution action taken (if any)
275    pub fn analyze_event(&self, event_type: &str, payload: &JsonValue) -> Option<EvolutionAction> {
276        let new_schema = infer_schema(payload);
277
278        if !self.schemas.contains_key(event_type) {
279            // First event of this type — infer and register
280            self.schemas.insert(event_type.to_string(), new_schema);
281            self.versions.insert(event_type.to_string(), 1);
282            self.history
283                .entry(event_type.to_string())
284                .or_default()
285                .push(EvolutionRecord {
286                    id: Uuid::new_v4(),
287                    event_type: event_type.to_string(),
288                    action: EvolutionAction::Inferred,
289                    from_version: None,
290                    to_version: Some(1),
291                    added_fields: vec![],
292                    removed_fields: vec![],
293                    type_changes: vec![],
294                    timestamp: Utc::now(),
295                });
296            return Some(EvolutionAction::Inferred);
297        }
298
299        // Compare with existing schema
300        let existing = self.schemas.get(event_type).unwrap().clone();
301        let diff = compute_diff(&existing, &new_schema);
302
303        // No changes
304        if diff.added_fields.is_empty()
305            && diff.removed_fields.is_empty()
306            && diff.type_changes.is_empty()
307        {
308            return None;
309        }
310
311        let current_version = *self.versions.get(event_type).unwrap();
312
313        if diff.is_backward_compatible {
314            // Auto-evolve: merge new fields into existing schema
315            let mut merged = existing;
316            for (key, field) in new_schema.fields {
317                merged.fields.entry(key).or_insert(FieldSchema {
318                    nullable: true, // New fields are optional
319                    ..field
320                });
321            }
322
323            let new_version = current_version + 1;
324            self.schemas.insert(event_type.to_string(), merged);
325            self.versions.insert(event_type.to_string(), new_version);
326            self.history
327                .entry(event_type.to_string())
328                .or_default()
329                .push(EvolutionRecord {
330                    id: Uuid::new_v4(),
331                    event_type: event_type.to_string(),
332                    action: EvolutionAction::AutoEvolved,
333                    from_version: Some(current_version),
334                    to_version: Some(new_version),
335                    added_fields: diff.added_fields,
336                    removed_fields: vec![],
337                    type_changes: vec![],
338                    timestamp: Utc::now(),
339                });
340            Some(EvolutionAction::AutoEvolved)
341        } else {
342            // Breaking change — record but don't auto-evolve
343            self.history
344                .entry(event_type.to_string())
345                .or_default()
346                .push(EvolutionRecord {
347                    id: Uuid::new_v4(),
348                    event_type: event_type.to_string(),
349                    action: EvolutionAction::BreakingChangeDetected,
350                    from_version: Some(current_version),
351                    to_version: None,
352                    added_fields: diff.added_fields,
353                    removed_fields: diff.removed_fields,
354                    type_changes: diff.type_changes,
355                    timestamp: Utc::now(),
356                });
357            Some(EvolutionAction::BreakingChangeDetected)
358        }
359    }
360
361    /// Get the current inferred schema for an event type
362    pub fn get_schema(&self, event_type: &str) -> Option<InferredSchema> {
363        self.schemas.get(event_type).map(|s| s.clone())
364    }
365
366    /// Get the evolution history for an event type
367    pub fn get_history(&self, event_type: &str) -> Vec<EvolutionRecord> {
368        self.history
369            .get(event_type)
370            .map(|h| h.clone())
371            .unwrap_or_default()
372    }
373
374    /// Get all tracked event types
375    pub fn list_event_types(&self) -> Vec<String> {
376        self.schemas.iter().map(|e| e.key().clone()).collect()
377    }
378
379    /// Get the current version for an event type
380    pub fn get_version(&self, event_type: &str) -> Option<u32> {
381        self.versions.get(event_type).map(|v| *v)
382    }
383
384    /// Statistics
385    pub fn stats(&self) -> SchemaEvolutionStats {
386        let total_evolutions: usize = self.history.iter().map(|h| h.value().len()).sum();
387        let breaking_changes: usize = self
388            .history
389            .iter()
390            .map(|h| {
391                h.value()
392                    .iter()
393                    .filter(|r| r.action == EvolutionAction::BreakingChangeDetected)
394                    .count()
395            })
396            .sum();
397        SchemaEvolutionStats {
398            tracked_event_types: self.schemas.len(),
399            total_evolutions,
400            breaking_changes,
401        }
402    }
403}
404
405/// Schema evolution statistics
406#[derive(Debug, Clone, Serialize)]
407pub struct SchemaEvolutionStats {
408    pub tracked_event_types: usize,
409    pub total_evolutions: usize,
410    pub breaking_changes: usize,
411}
412
413#[cfg(test)]
414mod tests {
415    use super::*;
416
417    #[test]
418    fn test_infer_schema_basic() {
419        let payload = serde_json::json!({"name": "Alice", "age": 30, "active": true});
420        let schema = infer_schema(&payload);
421        assert_eq!(schema.fields.len(), 3);
422        assert_eq!(schema.fields["name"].inferred_type, InferredType::String);
423        assert_eq!(schema.fields["age"].inferred_type, InferredType::Number);
424        assert_eq!(schema.fields["active"].inferred_type, InferredType::Boolean);
425    }
426
427    #[test]
428    fn test_infer_schema_nested() {
429        let payload = serde_json::json!({"address": {"city": "NYC", "zip": 10001}});
430        let schema = infer_schema(&payload);
431        let addr = &schema.fields["address"];
432        assert_eq!(addr.inferred_type, InferredType::Object);
433        assert_eq!(addr.nested.len(), 2);
434        assert_eq!(addr.nested["city"].inferred_type, InferredType::String);
435    }
436
437    #[test]
438    fn test_to_json_schema() {
439        let payload = serde_json::json!({"name": "Alice", "age": 30});
440        let schema = infer_schema(&payload);
441        let json_schema = to_json_schema(&schema);
442        assert_eq!(json_schema["type"], "object");
443        assert!(json_schema["properties"]["name"].is_object());
444        assert_eq!(json_schema["properties"]["name"]["type"], "string");
445    }
446
447    #[test]
448    fn test_compute_diff_no_changes() {
449        let payload = serde_json::json!({"name": "Alice"});
450        let schema = infer_schema(&payload);
451        let diff = compute_diff(&schema, &schema);
452        assert!(diff.added_fields.is_empty());
453        assert!(diff.removed_fields.is_empty());
454        assert!(diff.type_changes.is_empty());
455        assert!(diff.is_backward_compatible);
456    }
457
458    #[test]
459    fn test_compute_diff_added_field() {
460        let old = infer_schema(&serde_json::json!({"name": "Alice"}));
461        let new = infer_schema(&serde_json::json!({"name": "Alice", "email": "a@b.com"}));
462        let diff = compute_diff(&old, &new);
463        assert_eq!(diff.added_fields, vec!["email"]);
464        assert!(diff.is_backward_compatible);
465    }
466
467    #[test]
468    fn test_compute_diff_type_change() {
469        let old = infer_schema(&serde_json::json!({"age": 30}));
470        let new = infer_schema(&serde_json::json!({"age": "thirty"}));
471        let diff = compute_diff(&old, &new);
472        assert_eq!(diff.type_changes.len(), 1);
473        assert_eq!(diff.type_changes[0].field, "age");
474        assert!(!diff.is_backward_compatible);
475    }
476
477    #[test]
478    fn test_compute_diff_removed_required_field() {
479        let old = infer_schema(&serde_json::json!({"name": "Alice", "age": 30}));
480        let new = infer_schema(&serde_json::json!({"name": "Alice"}));
481        let diff = compute_diff(&old, &new);
482        assert_eq!(diff.removed_fields, vec!["age"]);
483        assert!(!diff.is_backward_compatible);
484    }
485
486    #[test]
487    fn test_manager_first_event_infers() {
488        let mgr = SchemaEvolutionManager::new();
489        let action = mgr.analyze_event("user.created", &serde_json::json!({"name": "Alice"}));
490        assert_eq!(action, Some(EvolutionAction::Inferred));
491        assert_eq!(mgr.get_version("user.created"), Some(1));
492    }
493
494    #[test]
495    fn test_manager_same_schema_no_action() {
496        let mgr = SchemaEvolutionManager::new();
497        mgr.analyze_event("user.created", &serde_json::json!({"name": "Alice"}));
498        let action = mgr.analyze_event("user.created", &serde_json::json!({"name": "Bob"}));
499        assert_eq!(action, None);
500    }
501
502    #[test]
503    fn test_manager_auto_evolve() {
504        let mgr = SchemaEvolutionManager::new();
505        mgr.analyze_event("user.created", &serde_json::json!({"name": "Alice"}));
506        let action = mgr.analyze_event(
507            "user.created",
508            &serde_json::json!({"name": "Bob", "email": "bob@example.com"}),
509        );
510        assert_eq!(action, Some(EvolutionAction::AutoEvolved));
511        assert_eq!(mgr.get_version("user.created"), Some(2));
512        // Email field should now be in the schema
513        let schema = mgr.get_schema("user.created").unwrap();
514        assert!(schema.fields.contains_key("email"));
515    }
516
517    #[test]
518    fn test_manager_breaking_change() {
519        let mgr = SchemaEvolutionManager::new();
520        mgr.analyze_event(
521            "user.created",
522            &serde_json::json!({"name": "Alice", "age": 30}),
523        );
524        let action = mgr.analyze_event(
525            "user.created",
526            &serde_json::json!({"name": "Bob", "age": "thirty"}),
527        );
528        assert_eq!(action, Some(EvolutionAction::BreakingChangeDetected));
529        // Version should NOT have incremented
530        assert_eq!(mgr.get_version("user.created"), Some(1));
531    }
532
533    #[test]
534    fn test_manager_history() {
535        let mgr = SchemaEvolutionManager::new();
536        mgr.analyze_event("user.created", &serde_json::json!({"name": "Alice"}));
537        mgr.analyze_event(
538            "user.created",
539            &serde_json::json!({"name": "Bob", "email": "b@b.com"}),
540        );
541        let history = mgr.get_history("user.created");
542        assert_eq!(history.len(), 2);
543        assert_eq!(history[0].action, EvolutionAction::Inferred);
544        assert_eq!(history[1].action, EvolutionAction::AutoEvolved);
545    }
546
547    #[test]
548    fn test_manager_stats() {
549        let mgr = SchemaEvolutionManager::new();
550        mgr.analyze_event("user.created", &serde_json::json!({"name": "Alice"}));
551        mgr.analyze_event("order.placed", &serde_json::json!({"total": 99.99}));
552        let stats = mgr.stats();
553        assert_eq!(stats.tracked_event_types, 2);
554        assert_eq!(stats.total_evolutions, 2);
555        assert_eq!(stats.breaking_changes, 0);
556    }
557
558    #[test]
559    fn test_manager_list_event_types() {
560        let mgr = SchemaEvolutionManager::new();
561        mgr.analyze_event("user.created", &serde_json::json!({"name": "Alice"}));
562        mgr.analyze_event("order.placed", &serde_json::json!({"total": 99.99}));
563        let types = mgr.list_event_types();
564        assert_eq!(types.len(), 2);
565        assert!(types.contains(&"user.created".to_string()));
566        assert!(types.contains(&"order.placed".to_string()));
567    }
568
569    #[test]
570    fn test_null_field_compatible() {
571        let old = infer_schema(&serde_json::json!({"name": "Alice", "bio": null}));
572        let new = infer_schema(&serde_json::json!({"name": "Alice", "bio": "Hello"}));
573        let diff = compute_diff(&old, &new);
574        // null → string is not a breaking change (null is compatible with any type)
575        assert!(diff.type_changes.is_empty());
576        assert!(diff.is_backward_compatible);
577    }
578}