1use chrono::{DateTime, Utc};
28use dashmap::DashMap;
29use serde::{Deserialize, Serialize};
30use serde_json::Value as JsonValue;
31use std::collections::{BTreeMap, BTreeSet};
32use uuid::Uuid;
33
34#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
36pub enum EvolutionAction {
37 Inferred,
39 AutoEvolved,
41 BreakingChangeDetected,
43}
44
45#[derive(Debug, Clone, Serialize)]
47pub struct EvolutionRecord {
48 pub id: Uuid,
49 pub event_type: String,
50 pub action: EvolutionAction,
51 pub from_version: Option<u32>,
52 pub to_version: Option<u32>,
53 pub added_fields: Vec<String>,
54 pub removed_fields: Vec<String>,
55 pub type_changes: Vec<FieldTypeChange>,
56 pub timestamp: DateTime<Utc>,
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
61pub struct FieldTypeChange {
62 pub field: String,
63 pub old_type: String,
64 pub new_type: String,
65}
66
67#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
69pub enum InferredType {
70 String,
71 Number,
72 Boolean,
73 Array,
74 Object,
75 Null,
76}
77
78impl std::fmt::Display for InferredType {
79 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80 match self {
81 InferredType::String => write!(f, "string"),
82 InferredType::Number => write!(f, "number"),
83 InferredType::Boolean => write!(f, "boolean"),
84 InferredType::Array => write!(f, "array"),
85 InferredType::Object => write!(f, "object"),
86 InferredType::Null => write!(f, "null"),
87 }
88 }
89}
90
91fn infer_type(value: &JsonValue) -> InferredType {
93 match value {
94 JsonValue::Null => InferredType::Null,
95 JsonValue::Bool(_) => InferredType::Boolean,
96 JsonValue::Number(_) => InferredType::Number,
97 JsonValue::String(_) => InferredType::String,
98 JsonValue::Array(_) => InferredType::Array,
99 JsonValue::Object(_) => InferredType::Object,
100 }
101}
102
103#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct FieldSchema {
106 pub name: String,
107 pub inferred_type: InferredType,
108 pub nullable: bool,
109 pub nested: BTreeMap<String, FieldSchema>,
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct InferredSchema {
116 pub fields: BTreeMap<String, FieldSchema>,
117}
118
119#[derive(Debug, Clone, Serialize)]
121pub struct SchemaDiff {
122 pub added_fields: Vec<String>,
123 pub removed_fields: Vec<String>,
124 pub type_changes: Vec<FieldTypeChange>,
125 pub is_backward_compatible: bool,
126}
127
128pub fn infer_schema(payload: &JsonValue) -> InferredSchema {
130 let mut fields = BTreeMap::new();
131 if let Some(obj) = payload.as_object() {
132 for (key, value) in obj {
133 let mut nested = BTreeMap::new();
134 if let Some(inner_obj) = value.as_object() {
135 for (nk, nv) in inner_obj {
136 nested.insert(
137 nk.clone(),
138 FieldSchema {
139 name: nk.clone(),
140 inferred_type: infer_type(nv),
141 nullable: nv.is_null(),
142 nested: BTreeMap::new(),
143 },
144 );
145 }
146 }
147 fields.insert(
148 key.clone(),
149 FieldSchema {
150 name: key.clone(),
151 inferred_type: infer_type(value),
152 nullable: value.is_null(),
153 nested,
154 },
155 );
156 }
157 }
158 InferredSchema { fields }
159}
160
161pub fn to_json_schema(schema: &InferredSchema) -> JsonValue {
163 let mut properties = serde_json::Map::new();
164 let mut required = Vec::new();
165
166 for (name, field) in &schema.fields {
167 let type_str = match field.inferred_type {
168 InferredType::String => "string",
169 InferredType::Number => "number",
170 InferredType::Boolean => "boolean",
171 InferredType::Array => "array",
172 InferredType::Object => "object",
173 InferredType::Null => "null",
174 };
175
176 let mut prop = serde_json::json!({"type": type_str});
177
178 if field.inferred_type == InferredType::Object && !field.nested.is_empty() {
180 let nested_schema = InferredSchema {
181 fields: field.nested.clone(),
182 };
183 let nested_json = to_json_schema(&nested_schema);
184 if let Some(np) = nested_json.get("properties") {
185 prop["properties"] = np.clone();
186 }
187 }
188
189 properties.insert(name.clone(), prop);
190 if !field.nullable {
191 required.push(JsonValue::String(name.clone()));
192 }
193 }
194
195 serde_json::json!({
196 "type": "object",
197 "properties": properties,
198 "required": required,
199 })
200}
201
202pub fn compute_diff(existing: &InferredSchema, new: &InferredSchema) -> SchemaDiff {
204 let existing_keys: BTreeSet<&String> = existing.fields.keys().collect();
205 let new_keys: BTreeSet<&String> = new.fields.keys().collect();
206
207 let added: Vec<String> = new_keys
208 .difference(&existing_keys)
209 .map(|k| (*k).clone())
210 .collect();
211 let removed: Vec<String> = existing_keys
212 .difference(&new_keys)
213 .map(|k| (*k).clone())
214 .collect();
215
216 let mut type_changes = Vec::new();
217 for key in existing_keys.intersection(&new_keys) {
218 let old_field = &existing.fields[*key];
219 let new_field = &new.fields[*key];
220 if old_field.inferred_type != new_field.inferred_type
221 && old_field.inferred_type != InferredType::Null
222 && new_field.inferred_type != InferredType::Null
223 {
224 type_changes.push(FieldTypeChange {
225 field: (*key).clone(),
226 old_type: old_field.inferred_type.to_string(),
227 new_type: new_field.inferred_type.to_string(),
228 });
229 }
230 }
231
232 let removed_required = removed
234 .iter()
235 .any(|r| existing.fields.get(r).is_some_and(|f| !f.nullable));
236
237 let is_backward_compatible = type_changes.is_empty() && !removed_required;
238
239 SchemaDiff {
240 added_fields: added,
241 removed_fields: removed,
242 type_changes,
243 is_backward_compatible,
244 }
245}
246
247pub struct SchemaEvolutionManager {
249 schemas: DashMap<String, InferredSchema>,
251 history: DashMap<String, Vec<EvolutionRecord>>,
253 versions: DashMap<String, u32>,
255}
256
257impl Default for SchemaEvolutionManager {
258 fn default() -> Self {
259 Self::new()
260 }
261}
262
263impl SchemaEvolutionManager {
264 pub fn new() -> Self {
265 Self {
266 schemas: DashMap::new(),
267 history: DashMap::new(),
268 versions: DashMap::new(),
269 }
270 }
271
272 pub fn analyze_event(&self, event_type: &str, payload: &JsonValue) -> Option<EvolutionAction> {
276 let new_schema = infer_schema(payload);
277
278 if !self.schemas.contains_key(event_type) {
279 self.schemas.insert(event_type.to_string(), new_schema);
281 self.versions.insert(event_type.to_string(), 1);
282 self.history
283 .entry(event_type.to_string())
284 .or_default()
285 .push(EvolutionRecord {
286 id: Uuid::new_v4(),
287 event_type: event_type.to_string(),
288 action: EvolutionAction::Inferred,
289 from_version: None,
290 to_version: Some(1),
291 added_fields: vec![],
292 removed_fields: vec![],
293 type_changes: vec![],
294 timestamp: Utc::now(),
295 });
296 return Some(EvolutionAction::Inferred);
297 }
298
299 let existing = self.schemas.get(event_type).unwrap().clone();
301 let diff = compute_diff(&existing, &new_schema);
302
303 if diff.added_fields.is_empty()
305 && diff.removed_fields.is_empty()
306 && diff.type_changes.is_empty()
307 {
308 return None;
309 }
310
311 let current_version = *self.versions.get(event_type).unwrap();
312
313 if diff.is_backward_compatible {
314 let mut merged = existing;
316 for (key, field) in new_schema.fields {
317 merged.fields.entry(key).or_insert(FieldSchema {
318 nullable: true, ..field
320 });
321 }
322
323 let new_version = current_version + 1;
324 self.schemas.insert(event_type.to_string(), merged);
325 self.versions.insert(event_type.to_string(), new_version);
326 self.history
327 .entry(event_type.to_string())
328 .or_default()
329 .push(EvolutionRecord {
330 id: Uuid::new_v4(),
331 event_type: event_type.to_string(),
332 action: EvolutionAction::AutoEvolved,
333 from_version: Some(current_version),
334 to_version: Some(new_version),
335 added_fields: diff.added_fields,
336 removed_fields: vec![],
337 type_changes: vec![],
338 timestamp: Utc::now(),
339 });
340 Some(EvolutionAction::AutoEvolved)
341 } else {
342 self.history
344 .entry(event_type.to_string())
345 .or_default()
346 .push(EvolutionRecord {
347 id: Uuid::new_v4(),
348 event_type: event_type.to_string(),
349 action: EvolutionAction::BreakingChangeDetected,
350 from_version: Some(current_version),
351 to_version: None,
352 added_fields: diff.added_fields,
353 removed_fields: diff.removed_fields,
354 type_changes: diff.type_changes,
355 timestamp: Utc::now(),
356 });
357 Some(EvolutionAction::BreakingChangeDetected)
358 }
359 }
360
361 pub fn get_schema(&self, event_type: &str) -> Option<InferredSchema> {
363 self.schemas.get(event_type).map(|s| s.clone())
364 }
365
366 pub fn get_history(&self, event_type: &str) -> Vec<EvolutionRecord> {
368 self.history
369 .get(event_type)
370 .map(|h| h.clone())
371 .unwrap_or_default()
372 }
373
374 pub fn list_event_types(&self) -> Vec<String> {
376 self.schemas.iter().map(|e| e.key().clone()).collect()
377 }
378
379 pub fn get_version(&self, event_type: &str) -> Option<u32> {
381 self.versions.get(event_type).map(|v| *v)
382 }
383
384 pub fn stats(&self) -> SchemaEvolutionStats {
386 let total_evolutions: usize = self.history.iter().map(|h| h.value().len()).sum();
387 let breaking_changes: usize = self
388 .history
389 .iter()
390 .map(|h| {
391 h.value()
392 .iter()
393 .filter(|r| r.action == EvolutionAction::BreakingChangeDetected)
394 .count()
395 })
396 .sum();
397 SchemaEvolutionStats {
398 tracked_event_types: self.schemas.len(),
399 total_evolutions,
400 breaking_changes,
401 }
402 }
403}
404
405#[derive(Debug, Clone, Serialize)]
407pub struct SchemaEvolutionStats {
408 pub tracked_event_types: usize,
409 pub total_evolutions: usize,
410 pub breaking_changes: usize,
411}
412
413#[cfg(test)]
414mod tests {
415 use super::*;
416
417 #[test]
418 fn test_infer_schema_basic() {
419 let payload = serde_json::json!({"name": "Alice", "age": 30, "active": true});
420 let schema = infer_schema(&payload);
421 assert_eq!(schema.fields.len(), 3);
422 assert_eq!(schema.fields["name"].inferred_type, InferredType::String);
423 assert_eq!(schema.fields["age"].inferred_type, InferredType::Number);
424 assert_eq!(schema.fields["active"].inferred_type, InferredType::Boolean);
425 }
426
427 #[test]
428 fn test_infer_schema_nested() {
429 let payload = serde_json::json!({"address": {"city": "NYC", "zip": 10001}});
430 let schema = infer_schema(&payload);
431 let addr = &schema.fields["address"];
432 assert_eq!(addr.inferred_type, InferredType::Object);
433 assert_eq!(addr.nested.len(), 2);
434 assert_eq!(addr.nested["city"].inferred_type, InferredType::String);
435 }
436
437 #[test]
438 fn test_to_json_schema() {
439 let payload = serde_json::json!({"name": "Alice", "age": 30});
440 let schema = infer_schema(&payload);
441 let json_schema = to_json_schema(&schema);
442 assert_eq!(json_schema["type"], "object");
443 assert!(json_schema["properties"]["name"].is_object());
444 assert_eq!(json_schema["properties"]["name"]["type"], "string");
445 }
446
447 #[test]
448 fn test_compute_diff_no_changes() {
449 let payload = serde_json::json!({"name": "Alice"});
450 let schema = infer_schema(&payload);
451 let diff = compute_diff(&schema, &schema);
452 assert!(diff.added_fields.is_empty());
453 assert!(diff.removed_fields.is_empty());
454 assert!(diff.type_changes.is_empty());
455 assert!(diff.is_backward_compatible);
456 }
457
458 #[test]
459 fn test_compute_diff_added_field() {
460 let old = infer_schema(&serde_json::json!({"name": "Alice"}));
461 let new = infer_schema(&serde_json::json!({"name": "Alice", "email": "a@b.com"}));
462 let diff = compute_diff(&old, &new);
463 assert_eq!(diff.added_fields, vec!["email"]);
464 assert!(diff.is_backward_compatible);
465 }
466
467 #[test]
468 fn test_compute_diff_type_change() {
469 let old = infer_schema(&serde_json::json!({"age": 30}));
470 let new = infer_schema(&serde_json::json!({"age": "thirty"}));
471 let diff = compute_diff(&old, &new);
472 assert_eq!(diff.type_changes.len(), 1);
473 assert_eq!(diff.type_changes[0].field, "age");
474 assert!(!diff.is_backward_compatible);
475 }
476
477 #[test]
478 fn test_compute_diff_removed_required_field() {
479 let old = infer_schema(&serde_json::json!({"name": "Alice", "age": 30}));
480 let new = infer_schema(&serde_json::json!({"name": "Alice"}));
481 let diff = compute_diff(&old, &new);
482 assert_eq!(diff.removed_fields, vec!["age"]);
483 assert!(!diff.is_backward_compatible);
484 }
485
486 #[test]
487 fn test_manager_first_event_infers() {
488 let mgr = SchemaEvolutionManager::new();
489 let action = mgr.analyze_event("user.created", &serde_json::json!({"name": "Alice"}));
490 assert_eq!(action, Some(EvolutionAction::Inferred));
491 assert_eq!(mgr.get_version("user.created"), Some(1));
492 }
493
494 #[test]
495 fn test_manager_same_schema_no_action() {
496 let mgr = SchemaEvolutionManager::new();
497 mgr.analyze_event("user.created", &serde_json::json!({"name": "Alice"}));
498 let action = mgr.analyze_event("user.created", &serde_json::json!({"name": "Bob"}));
499 assert_eq!(action, None);
500 }
501
502 #[test]
503 fn test_manager_auto_evolve() {
504 let mgr = SchemaEvolutionManager::new();
505 mgr.analyze_event("user.created", &serde_json::json!({"name": "Alice"}));
506 let action = mgr.analyze_event(
507 "user.created",
508 &serde_json::json!({"name": "Bob", "email": "bob@example.com"}),
509 );
510 assert_eq!(action, Some(EvolutionAction::AutoEvolved));
511 assert_eq!(mgr.get_version("user.created"), Some(2));
512 let schema = mgr.get_schema("user.created").unwrap();
514 assert!(schema.fields.contains_key("email"));
515 }
516
517 #[test]
518 fn test_manager_breaking_change() {
519 let mgr = SchemaEvolutionManager::new();
520 mgr.analyze_event(
521 "user.created",
522 &serde_json::json!({"name": "Alice", "age": 30}),
523 );
524 let action = mgr.analyze_event(
525 "user.created",
526 &serde_json::json!({"name": "Bob", "age": "thirty"}),
527 );
528 assert_eq!(action, Some(EvolutionAction::BreakingChangeDetected));
529 assert_eq!(mgr.get_version("user.created"), Some(1));
531 }
532
533 #[test]
534 fn test_manager_history() {
535 let mgr = SchemaEvolutionManager::new();
536 mgr.analyze_event("user.created", &serde_json::json!({"name": "Alice"}));
537 mgr.analyze_event(
538 "user.created",
539 &serde_json::json!({"name": "Bob", "email": "b@b.com"}),
540 );
541 let history = mgr.get_history("user.created");
542 assert_eq!(history.len(), 2);
543 assert_eq!(history[0].action, EvolutionAction::Inferred);
544 assert_eq!(history[1].action, EvolutionAction::AutoEvolved);
545 }
546
547 #[test]
548 fn test_manager_stats() {
549 let mgr = SchemaEvolutionManager::new();
550 mgr.analyze_event("user.created", &serde_json::json!({"name": "Alice"}));
551 mgr.analyze_event("order.placed", &serde_json::json!({"total": 99.99}));
552 let stats = mgr.stats();
553 assert_eq!(stats.tracked_event_types, 2);
554 assert_eq!(stats.total_evolutions, 2);
555 assert_eq!(stats.breaking_changes, 0);
556 }
557
558 #[test]
559 fn test_manager_list_event_types() {
560 let mgr = SchemaEvolutionManager::new();
561 mgr.analyze_event("user.created", &serde_json::json!({"name": "Alice"}));
562 mgr.analyze_event("order.placed", &serde_json::json!({"total": 99.99}));
563 let types = mgr.list_event_types();
564 assert_eq!(types.len(), 2);
565 assert!(types.contains(&"user.created".to_string()));
566 assert!(types.contains(&"order.placed".to_string()));
567 }
568
569 #[test]
570 fn test_null_field_compatible() {
571 let old = infer_schema(&serde_json::json!({"name": "Alice", "bio": null}));
572 let new = infer_schema(&serde_json::json!({"name": "Alice", "bio": "Hello"}));
573 let diff = compute_diff(&old, &new);
574 assert!(diff.type_changes.is_empty());
576 assert!(diff.is_backward_compatible);
577 }
578}