1#[cfg(feature = "native")]
8mod native {
9 use std::collections::HashMap;
10 use std::sync::Arc;
11 use tl_data::{ArrowDataType, ArrowField, ArrowSchema};
12
13 #[derive(Debug, Clone, Default)]
15 pub struct SchemaRegistry {
16 schemas: HashMap<String, Vec<VersionedSchema>>,
18 migrations: HashMap<(String, i64, i64), Vec<MigrationOp>>,
20 }
21
22 #[derive(Debug, Clone)]
24 pub struct VersionedSchema {
25 pub version: i64,
26 pub schema: Arc<ArrowSchema>,
27 pub metadata: SchemaMetadata,
28 }
29
30 #[derive(Debug, Clone, Default)]
32 pub struct SchemaMetadata {
33 pub field_since: HashMap<String, i64>,
35 pub field_deprecated: HashMap<String, i64>,
37 pub field_defaults: HashMap<String, String>,
39 }
40
41 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
43 pub enum CompatibilityMode {
44 Backward,
46 Forward,
48 Full,
50 None,
52 }
53
54 #[derive(Debug, Clone, PartialEq)]
56 pub enum SchemaDiff {
57 FieldAdded {
58 name: String,
59 type_name: String,
60 },
61 FieldRemoved {
62 name: String,
63 },
64 FieldRenamed {
65 from: String,
66 to: String,
67 },
68 TypeChanged {
69 field: String,
70 from: String,
71 to: String,
72 },
73 }
74
75 #[derive(Debug, Clone, PartialEq)]
77 pub enum CompatIssue {
78 FieldRemovedNotBackward(String),
79 FieldAddedNoDefault(String),
80 TypeNarrowed {
81 field: String,
82 from: String,
83 to: String,
84 },
85 }
86
87 #[derive(Debug, Clone)]
89 pub enum MigrationOp {
90 AddColumn {
91 name: String,
92 type_name: String,
93 default: Option<String>,
94 },
95 DropColumn {
96 name: String,
97 },
98 RenameColumn {
99 from: String,
100 to: String,
101 },
102 AlterType {
103 column: String,
104 new_type: String,
105 },
106 }
107
108 impl SchemaRegistry {
109 pub fn new() -> Self {
110 Self::default()
111 }
112
113 pub fn register(
115 &mut self,
116 name: &str,
117 version: i64,
118 schema: Arc<ArrowSchema>,
119 metadata: SchemaMetadata,
120 ) -> Result<(), String> {
121 let entries = self.schemas.entry(name.to_string()).or_default();
122 if entries.iter().any(|e| e.version == version) {
123 return Err(format!(
124 "Schema `{}` version {} already registered",
125 name, version
126 ));
127 }
128 entries.push(VersionedSchema {
129 version,
130 schema,
131 metadata,
132 });
133 entries.sort_by_key(|e| e.version);
134 Ok(())
135 }
136
137 pub fn get(&self, name: &str, version: i64) -> Option<&VersionedSchema> {
139 self.schemas
140 .get(name)?
141 .iter()
142 .find(|e| e.version == version)
143 }
144
145 pub fn latest(&self, name: &str) -> Option<&VersionedSchema> {
147 self.schemas.get(name)?.last()
148 }
149
150 pub fn history(&self, name: &str) -> Vec<&VersionedSchema> {
152 self.schemas
153 .get(name)
154 .map(|v| v.iter().collect())
155 .unwrap_or_default()
156 }
157
158 pub fn versions(&self, name: &str) -> Vec<i64> {
160 self.schemas
161 .get(name)
162 .map(|v| v.iter().map(|e| e.version).collect())
163 .unwrap_or_default()
164 }
165
166 pub fn fields(&self, name: &str, version: i64) -> Vec<(String, String)> {
168 if let Some(vs) = self.get(name, version) {
169 vs.schema
170 .fields()
171 .iter()
172 .map(|f| (f.name().to_string(), format!("{}", f.data_type())))
173 .collect()
174 } else {
175 Vec::new()
176 }
177 }
178
179 pub fn diff(&self, name: &str, v1: i64, v2: i64) -> Vec<SchemaDiff> {
181 let s1 = match self.get(name, v1) {
182 Some(s) => s,
183 None => return Vec::new(),
184 };
185 let s2 = match self.get(name, v2) {
186 Some(s) => s,
187 None => return Vec::new(),
188 };
189
190 let mut diffs = Vec::new();
191 let old_fields: HashMap<&str, &ArrowField> = s1
192 .schema
193 .fields()
194 .iter()
195 .map(|f| (f.name().as_str(), f.as_ref()))
196 .collect();
197 let new_fields: HashMap<&str, &ArrowField> = s2
198 .schema
199 .fields()
200 .iter()
201 .map(|f| (f.name().as_str(), f.as_ref()))
202 .collect();
203
204 let renames = self.get_renames(name, v1, v2);
206
207 for name_str in old_fields.keys() {
209 if !new_fields.contains_key(name_str) {
210 if let Some(new_name) = renames.get(*name_str) {
212 diffs.push(SchemaDiff::FieldRenamed {
213 from: name_str.to_string(),
214 to: new_name.clone(),
215 });
216 } else {
217 diffs.push(SchemaDiff::FieldRemoved {
218 name: name_str.to_string(),
219 });
220 }
221 }
222 }
223
224 for (name_str, field) in &new_fields {
226 if !old_fields.contains_key(name_str) {
227 let is_rename_target = renames.values().any(|v| v == *name_str);
229 if !is_rename_target {
230 diffs.push(SchemaDiff::FieldAdded {
231 name: name_str.to_string(),
232 type_name: format!("{}", field.data_type()),
233 });
234 }
235 }
236 }
237
238 for (name_str, old_field) in &old_fields {
240 if let Some(new_field) = new_fields.get(name_str)
241 && old_field.data_type() != new_field.data_type()
242 {
243 diffs.push(SchemaDiff::TypeChanged {
244 field: name_str.to_string(),
245 from: format!("{}", old_field.data_type()),
246 to: format!("{}", new_field.data_type()),
247 });
248 }
249 }
250
251 diffs
252 }
253
254 pub fn check_compatibility(
256 &self,
257 name: &str,
258 old_ver: i64,
259 new_ver: i64,
260 mode: CompatibilityMode,
261 ) -> Vec<CompatIssue> {
262 if mode == CompatibilityMode::None {
263 return Vec::new();
264 }
265
266 let old_schema = match self.get(name, old_ver) {
267 Some(s) => s,
268 None => return Vec::new(),
269 };
270 let new_schema = match self.get(name, new_ver) {
271 Some(s) => s,
272 None => return Vec::new(),
273 };
274
275 let mut issues = Vec::new();
276 let old_fields: HashMap<&str, &ArrowField> = old_schema
277 .schema
278 .fields()
279 .iter()
280 .map(|f| (f.name().as_str(), f.as_ref()))
281 .collect();
282 let new_fields: HashMap<&str, &ArrowField> = new_schema
283 .schema
284 .fields()
285 .iter()
286 .map(|f| (f.name().as_str(), f.as_ref()))
287 .collect();
288
289 if mode == CompatibilityMode::Backward || mode == CompatibilityMode::Full {
291 for name_str in old_fields.keys() {
292 if !new_fields.contains_key(name_str) {
293 issues.push(CompatIssue::FieldRemovedNotBackward(name_str.to_string()));
294 }
295 }
296 }
297
298 if mode == CompatibilityMode::Forward || mode == CompatibilityMode::Full {
300 for name_str in new_fields.keys() {
301 if !old_fields.contains_key(name_str) {
302 let has_default =
304 new_schema.metadata.field_defaults.contains_key(*name_str);
305 if !has_default {
306 issues.push(CompatIssue::FieldAddedNoDefault(name_str.to_string()));
307 }
308 }
309 }
310 }
311
312 for (name_str, old_field) in &old_fields {
314 if let Some(new_field) = new_fields.get(name_str)
315 && old_field.data_type() != new_field.data_type()
316 && !is_type_widening(old_field.data_type(), new_field.data_type())
317 {
318 issues.push(CompatIssue::TypeNarrowed {
319 field: name_str.to_string(),
320 from: format!("{}", old_field.data_type()),
321 to: format!("{}", new_field.data_type()),
322 });
323 }
324 }
325
326 issues
327 }
328
329 pub fn register_migration(
331 &mut self,
332 schema_name: &str,
333 from_ver: i64,
334 to_ver: i64,
335 ops: Vec<MigrationOp>,
336 ) {
337 self.migrations
338 .insert((schema_name.to_string(), from_ver, to_ver), ops);
339 }
340
341 fn get_renames(&self, name: &str, from_ver: i64, to_ver: i64) -> HashMap<String, String> {
343 let mut renames = HashMap::new();
344 if let Some(ops) = self.migrations.get(&(name.to_string(), from_ver, to_ver)) {
345 for op in ops {
346 if let MigrationOp::RenameColumn { from, to } = op {
347 renames.insert(from.clone(), to.clone());
348 }
349 }
350 }
351 renames
352 }
353
354 pub fn apply_migration(
356 &mut self,
357 schema_name: &str,
358 from_ver: i64,
359 to_ver: i64,
360 ops: &[MigrationOp],
361 ) -> Result<(), String> {
362 let source = self
363 .get(schema_name, from_ver)
364 .ok_or_else(|| format!("Source schema `{}` v{} not found", schema_name, from_ver))?
365 .clone();
366
367 let mut fields: Vec<ArrowField> = source
368 .schema
369 .fields()
370 .iter()
371 .map(|f| f.as_ref().clone())
372 .collect();
373 let mut metadata = source.metadata.clone();
374
375 for op in ops {
376 match op {
377 MigrationOp::AddColumn {
378 name,
379 type_name,
380 default,
381 } => {
382 let dt = type_name_to_arrow(type_name);
383 fields.push(ArrowField::new(name, dt, true));
384 metadata.field_since.insert(name.clone(), to_ver);
385 if let Some(def) = default {
386 metadata.field_defaults.insert(name.clone(), def.clone());
387 }
388 }
389 MigrationOp::DropColumn { name } => {
390 fields.retain(|f| f.name() != name);
391 }
392 MigrationOp::RenameColumn { from, to } => {
393 for f in &mut fields {
394 if f.name() == from {
395 *f = ArrowField::new(to, f.data_type().clone(), f.is_nullable());
396 }
397 }
398 }
399 MigrationOp::AlterType { column, new_type } => {
400 let dt = type_name_to_arrow(new_type);
401 for f in &mut fields {
402 if f.name() == column {
403 *f = ArrowField::new(column, dt.clone(), f.is_nullable());
404 }
405 }
406 }
407 }
408 }
409
410 let new_schema = Arc::new(ArrowSchema::new(fields));
411 self.register(schema_name, to_ver, new_schema, metadata)?;
412 self.register_migration(schema_name, from_ver, to_ver, ops.to_vec());
413 Ok(())
414 }
415 }
416
417 fn is_type_widening(from: &ArrowDataType, to: &ArrowDataType) -> bool {
419 matches!(
420 (from, to),
421 (
422 ArrowDataType::Int8,
423 ArrowDataType::Int16
424 | ArrowDataType::Int32
425 | ArrowDataType::Int64
426 | ArrowDataType::Float32
427 | ArrowDataType::Float64
428 ) | (
429 ArrowDataType::Int16,
430 ArrowDataType::Int32
431 | ArrowDataType::Int64
432 | ArrowDataType::Float32
433 | ArrowDataType::Float64
434 ) | (
435 ArrowDataType::Int32,
436 ArrowDataType::Int64 | ArrowDataType::Float64
437 ) | (ArrowDataType::Float32, ArrowDataType::Float64)
438 )
439 }
440
441 pub fn type_name_to_arrow_pub(name: &str) -> ArrowDataType {
443 type_name_to_arrow(name)
444 }
445
446 fn type_name_to_arrow(name: &str) -> ArrowDataType {
448 match name {
449 "int8" => ArrowDataType::Int8,
450 "int16" => ArrowDataType::Int16,
451 "int32" | "int" => ArrowDataType::Int32,
452 "int64" => ArrowDataType::Int64,
453 "float32" | "float" => ArrowDataType::Float32,
454 "float64" => ArrowDataType::Float64,
455 "string" | "utf8" | "Utf8" => ArrowDataType::Utf8,
456 "bool" | "boolean" => ArrowDataType::Boolean,
457 _ => ArrowDataType::Utf8, }
459 }
460
461 impl CompatibilityMode {
462 pub fn from_str(s: &str) -> Self {
463 match s.to_lowercase().as_str() {
464 "backward" | "backwards" => CompatibilityMode::Backward,
465 "forward" | "forwards" => CompatibilityMode::Forward,
466 "full" => CompatibilityMode::Full,
467 "none" => CompatibilityMode::None,
468 _ => CompatibilityMode::Backward,
469 }
470 }
471 }
472
473 impl std::fmt::Display for SchemaDiff {
474 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
475 match self {
476 SchemaDiff::FieldAdded { name, type_name } => {
477 write!(f, "added field `{}` ({})", name, type_name)
478 }
479 SchemaDiff::FieldRemoved { name } => write!(f, "removed field `{}`", name),
480 SchemaDiff::FieldRenamed { from, to } => {
481 write!(f, "renamed field `{}` to `{}`", from, to)
482 }
483 SchemaDiff::TypeChanged { field, from, to } => {
484 write!(f, "changed type of `{}` from {} to {}", field, from, to)
485 }
486 }
487 }
488 }
489
490 impl std::fmt::Display for CompatIssue {
491 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
492 match self {
493 CompatIssue::FieldRemovedNotBackward(name) => {
494 write!(f, "field `{}` removed (breaks backward compat)", name)
495 }
496 CompatIssue::FieldAddedNoDefault(name) => write!(
497 f,
498 "field `{}` added without default (breaks forward compat)",
499 name
500 ),
501 CompatIssue::TypeNarrowed { field, from, to } => {
502 write!(f, "field `{}` type narrowed from {} to {}", field, from, to)
503 }
504 }
505 }
506 }
507
508 #[cfg(test)]
509 mod tests {
510 use super::*;
511
512 fn make_schema(fields: &[(&str, ArrowDataType)]) -> Arc<ArrowSchema> {
513 let arrow_fields: Vec<ArrowField> = fields
514 .iter()
515 .map(|(n, dt)| ArrowField::new(*n, dt.clone(), true))
516 .collect();
517 Arc::new(ArrowSchema::new(arrow_fields))
518 }
519
520 #[test]
521 fn test_register_schema_v1() {
522 let mut reg = SchemaRegistry::new();
523 let schema =
524 make_schema(&[("id", ArrowDataType::Int64), ("name", ArrowDataType::Utf8)]);
525 assert!(
526 reg.register("User", 1, schema, SchemaMetadata::default())
527 .is_ok()
528 );
529 assert!(reg.get("User", 1).is_some());
530 }
531
532 #[test]
533 fn test_register_schema_v2() {
534 let mut reg = SchemaRegistry::new();
535 let s1 = make_schema(&[("id", ArrowDataType::Int64)]);
536 let s2 = make_schema(&[("id", ArrowDataType::Int64), ("email", ArrowDataType::Utf8)]);
537 reg.register("User", 1, s1, SchemaMetadata::default())
538 .unwrap();
539 reg.register("User", 2, s2, SchemaMetadata::default())
540 .unwrap();
541 assert!(reg.get("User", 2).is_some());
542 }
543
544 #[test]
545 fn test_get_specific_version() {
546 let mut reg = SchemaRegistry::new();
547 let s1 = make_schema(&[("id", ArrowDataType::Int64)]);
548 let s2 = make_schema(&[("id", ArrowDataType::Int64), ("name", ArrowDataType::Utf8)]);
549 reg.register("User", 1, s1, SchemaMetadata::default())
550 .unwrap();
551 reg.register("User", 2, s2, SchemaMetadata::default())
552 .unwrap();
553 let v1 = reg.get("User", 1).unwrap();
554 assert_eq!(v1.schema.fields().len(), 1);
555 let v2 = reg.get("User", 2).unwrap();
556 assert_eq!(v2.schema.fields().len(), 2);
557 }
558
559 #[test]
560 fn test_get_latest() {
561 let mut reg = SchemaRegistry::new();
562 let s1 = make_schema(&[("id", ArrowDataType::Int64)]);
563 let s2 = make_schema(&[("id", ArrowDataType::Int64), ("name", ArrowDataType::Utf8)]);
564 reg.register("User", 1, s1, SchemaMetadata::default())
565 .unwrap();
566 reg.register("User", 2, s2, SchemaMetadata::default())
567 .unwrap();
568 let latest = reg.latest("User").unwrap();
569 assert_eq!(latest.version, 2);
570 }
571
572 #[test]
573 fn test_history_ordered() {
574 let mut reg = SchemaRegistry::new();
575 let s1 = make_schema(&[("id", ArrowDataType::Int64)]);
576 let s2 = make_schema(&[("id", ArrowDataType::Int64), ("name", ArrowDataType::Utf8)]);
577 let s3 = make_schema(&[
578 ("id", ArrowDataType::Int64),
579 ("name", ArrowDataType::Utf8),
580 ("email", ArrowDataType::Utf8),
581 ]);
582 reg.register("User", 1, s1, SchemaMetadata::default())
583 .unwrap();
584 reg.register("User", 3, s3, SchemaMetadata::default())
585 .unwrap();
586 reg.register("User", 2, s2, SchemaMetadata::default())
587 .unwrap();
588 let hist = reg.history("User");
589 let versions: Vec<i64> = hist.iter().map(|v| v.version).collect();
590 assert_eq!(versions, vec![1, 2, 3]);
591 }
592
593 #[test]
594 fn test_backward_compat_adding_column_ok() {
595 let mut reg = SchemaRegistry::new();
596 let s1 = make_schema(&[("id", ArrowDataType::Int64)]);
597 let s2 = make_schema(&[("id", ArrowDataType::Int64), ("name", ArrowDataType::Utf8)]);
598 reg.register("User", 1, s1, SchemaMetadata::default())
599 .unwrap();
600 reg.register("User", 2, s2, SchemaMetadata::default())
601 .unwrap();
602 let issues = reg.check_compatibility("User", 1, 2, CompatibilityMode::Backward);
603 assert!(
604 issues.is_empty(),
605 "Adding column should be backward compatible, got: {:?}",
606 issues
607 );
608 }
609
610 #[test]
611 fn test_backward_compat_removing_column_fails() {
612 let mut reg = SchemaRegistry::new();
613 let s1 = make_schema(&[("id", ArrowDataType::Int64), ("name", ArrowDataType::Utf8)]);
614 let s2 = make_schema(&[("id", ArrowDataType::Int64)]);
615 reg.register("User", 1, s1, SchemaMetadata::default())
616 .unwrap();
617 reg.register("User", 2, s2, SchemaMetadata::default())
618 .unwrap();
619 let issues = reg.check_compatibility("User", 1, 2, CompatibilityMode::Backward);
620 assert!(!issues.is_empty());
621 assert!(matches!(&issues[0], CompatIssue::FieldRemovedNotBackward(n) if n == "name"));
622 }
623
624 #[test]
625 fn test_backward_compat_type_widening_ok() {
626 let mut reg = SchemaRegistry::new();
627 let s1 = make_schema(&[("id", ArrowDataType::Int32)]);
628 let s2 = make_schema(&[("id", ArrowDataType::Int64)]);
629 reg.register("T", 1, s1, SchemaMetadata::default()).unwrap();
630 reg.register("T", 2, s2, SchemaMetadata::default()).unwrap();
631 let issues = reg.check_compatibility("T", 1, 2, CompatibilityMode::Backward);
632 assert!(
633 issues.is_empty(),
634 "Type widening Int32->Int64 should be backward compatible"
635 );
636 }
637
638 #[test]
639 fn test_backward_compat_type_narrowing_fails() {
640 let mut reg = SchemaRegistry::new();
641 let s1 = make_schema(&[("id", ArrowDataType::Int64)]);
642 let s2 = make_schema(&[("id", ArrowDataType::Int32)]);
643 reg.register("T", 1, s1, SchemaMetadata::default()).unwrap();
644 reg.register("T", 2, s2, SchemaMetadata::default()).unwrap();
645 let issues = reg.check_compatibility("T", 1, 2, CompatibilityMode::Backward);
646 assert!(!issues.is_empty());
647 assert!(matches!(&issues[0], CompatIssue::TypeNarrowed { .. }));
648 }
649
650 #[test]
651 fn test_forward_compat_removing_column_ok() {
652 let mut reg = SchemaRegistry::new();
653 let s1 = make_schema(&[("id", ArrowDataType::Int64), ("name", ArrowDataType::Utf8)]);
654 let s2 = make_schema(&[("id", ArrowDataType::Int64)]);
655 reg.register("User", 1, s1, SchemaMetadata::default())
656 .unwrap();
657 reg.register("User", 2, s2, SchemaMetadata::default())
658 .unwrap();
659 let issues = reg.check_compatibility("User", 1, 2, CompatibilityMode::Forward);
660 assert!(
661 issues.is_empty(),
662 "Removing column should be forward compatible"
663 );
664 }
665
666 #[test]
667 fn test_forward_compat_adding_without_default_fails() {
668 let mut reg = SchemaRegistry::new();
669 let s1 = make_schema(&[("id", ArrowDataType::Int64)]);
670 let s2 = make_schema(&[("id", ArrowDataType::Int64), ("name", ArrowDataType::Utf8)]);
671 reg.register("User", 1, s1, SchemaMetadata::default())
672 .unwrap();
673 reg.register("User", 2, s2, SchemaMetadata::default())
674 .unwrap();
675 let issues = reg.check_compatibility("User", 1, 2, CompatibilityMode::Forward);
676 assert!(!issues.is_empty());
677 assert!(matches!(&issues[0], CompatIssue::FieldAddedNoDefault(n) if n == "name"));
678 }
679
680 #[test]
681 fn test_full_compat() {
682 let mut reg = SchemaRegistry::new();
683 let s1 = make_schema(&[("id", ArrowDataType::Int32)]);
684 let mut meta = SchemaMetadata::default();
685 meta.field_defaults
686 .insert("name".to_string(), "\"\"".to_string());
687 let s2 = make_schema(&[("id", ArrowDataType::Int64), ("name", ArrowDataType::Utf8)]);
688 reg.register("T", 1, s1, SchemaMetadata::default()).unwrap();
689 reg.register("T", 2, s2, meta).unwrap();
690 let issues = reg.check_compatibility("T", 1, 2, CompatibilityMode::Full);
691 assert!(
692 issues.is_empty(),
693 "Type widening + defaults should pass full compat, got: {:?}",
694 issues
695 );
696 }
697
698 #[test]
699 fn test_diff_additions_removals() {
700 let mut reg = SchemaRegistry::new();
701 let s1 = make_schema(&[
702 ("id", ArrowDataType::Int64),
703 ("old_field", ArrowDataType::Utf8),
704 ]);
705 let s2 = make_schema(&[
706 ("id", ArrowDataType::Int64),
707 ("new_field", ArrowDataType::Utf8),
708 ]);
709 reg.register("User", 1, s1, SchemaMetadata::default())
710 .unwrap();
711 reg.register("User", 2, s2, SchemaMetadata::default())
712 .unwrap();
713 let diffs = reg.diff("User", 1, 2);
714 assert!(
715 diffs
716 .iter()
717 .any(|d| matches!(d, SchemaDiff::FieldRemoved { name } if name == "old_field"))
718 );
719 assert!(
720 diffs.iter().any(
721 |d| matches!(d, SchemaDiff::FieldAdded { name, .. } if name == "new_field")
722 )
723 );
724 }
725
726 #[test]
727 fn test_duplicate_version_error() {
728 let mut reg = SchemaRegistry::new();
729 let s1 = make_schema(&[("id", ArrowDataType::Int64)]);
730 let s2 = make_schema(&[("id", ArrowDataType::Int64)]);
731 reg.register("User", 1, s1, SchemaMetadata::default())
732 .unwrap();
733 let result = reg.register("User", 1, s2, SchemaMetadata::default());
734 assert!(result.is_err());
735 }
736 }
737}
738
739#[cfg(feature = "native")]
740pub use native::*;
741
742#[cfg(not(feature = "native"))]
744#[derive(Debug, Clone, Default)]
745pub struct SchemaRegistry;
746
747#[cfg(not(feature = "native"))]
748impl SchemaRegistry {
749 pub fn new() -> Self {
750 SchemaRegistry
751 }
752}