1use serde::{Deserialize, Serialize};
26
27#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
44#[serde(rename_all = "kebab-case")]
45pub struct Schema {
46 pub schema_id: i32,
48
49 #[serde(default)]
51 pub fields: Vec<Field>,
52
53 #[serde(default, skip_serializing_if = "Vec::is_empty")]
55 pub identifier_field_ids: Vec<i32>,
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
64#[serde(rename_all = "kebab-case")]
65pub struct Field {
66 pub id: i32,
69
70 pub name: String,
72
73 pub required: bool,
75
76 #[serde(rename = "type")]
78 pub field_type: Type,
79
80 #[serde(skip_serializing_if = "Option::is_none")]
82 pub doc: Option<String>,
83
84 #[serde(skip_serializing_if = "Option::is_none")]
86 pub default: Option<serde_json::Value>,
87}
88
89impl Field {
90 pub fn to_arrow_field(&self) -> arrow::datatypes::Field {
92 arrow::datatypes::Field::new(
93 &self.name,
94 self.field_type.to_arrow_datatype(),
95 !self.required,
96 )
97 }
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
105#[serde(tag = "type", rename_all = "lowercase")]
106pub enum Type {
107 Boolean,
109
110 Int,
112
113 Long,
115
116 Float,
118
119 Double,
121
122 Date,
124
125 Time,
127
128 #[serde(rename_all = "kebab-case")]
130 Timestamp {
131 with_timezone: bool,
133 },
134
135 String,
137
138 Uuid,
140
141 Binary,
143
144 #[serde(rename_all = "kebab-case")]
146 Fixed {
147 length: u32,
149 },
150
151 #[serde(rename_all = "kebab-case")]
153 Decimal {
154 precision: u32,
156 scale: u32,
158 },
159
160 #[serde(rename_all = "kebab-case")]
162 Struct {
163 fields: Vec<Field>,
165 },
166
167 #[serde(rename_all = "kebab-case")]
169 List {
170 element: Box<Field>,
172 },
173
174 #[serde(rename_all = "kebab-case")]
176 Map {
177 key: Box<Field>,
179 value: Box<Field>,
181 },
182}
183
184impl Type {
185 pub fn to_arrow_datatype(&self) -> arrow::datatypes::DataType {
187 match self {
188 Type::Boolean => arrow::datatypes::DataType::Boolean,
189 Type::Int => arrow::datatypes::DataType::Int32,
190 Type::Long => arrow::datatypes::DataType::Int64,
191 Type::Float => arrow::datatypes::DataType::Float32,
192 Type::Double => arrow::datatypes::DataType::Float64,
193 Type::Date => arrow::datatypes::DataType::Date32,
194 Type::Time => {
195 arrow::datatypes::DataType::Time64(arrow::datatypes::TimeUnit::Microsecond)
196 }
197 Type::Timestamp { with_timezone } => {
198 let tz = if *with_timezone {
199 Some("UTC".into())
200 } else {
201 None
202 };
203 arrow::datatypes::DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, tz)
204 }
205 Type::String => arrow::datatypes::DataType::Utf8,
206 Type::Uuid => arrow::datatypes::DataType::FixedSizeBinary(16),
207 Type::Binary => arrow::datatypes::DataType::Binary,
208 Type::Fixed { length } => arrow::datatypes::DataType::FixedSizeBinary(*length as i32),
209 Type::Decimal { precision, scale } => {
210 arrow::datatypes::DataType::Decimal128(*precision as u8, *scale as i8)
211 }
212 Type::Struct { fields } => {
213 let arrow_fields = fields.iter().map(|f| f.to_arrow_field()).collect();
214 arrow::datatypes::DataType::Struct(arrow_fields)
215 }
216 Type::List { element } => {
217 arrow::datatypes::DataType::List(std::sync::Arc::new(element.to_arrow_field()))
218 }
219 Type::Map { key, value } => {
220 let entries = arrow::datatypes::Field::new(
221 "entries",
222 arrow::datatypes::DataType::Struct(
223 vec![key.to_arrow_field(), value.to_arrow_field()].into(),
224 ),
225 false,
226 );
227 arrow::datatypes::DataType::Map(std::sync::Arc::new(entries), false)
228 }
229 }
230 }
231}
232
233pub struct SchemaBuilder {
235 schema_id: i32,
236 fields: Vec<Field>,
237 identifier_field_ids: Vec<i32>,
238}
239
240impl SchemaBuilder {
241 pub fn new(schema_id: i32) -> Self {
243 Self {
244 schema_id,
245 fields: Vec::new(),
246 identifier_field_ids: Vec::new(),
247 }
248 }
249
250 pub fn with_field(
252 mut self,
253 id: i32,
254 name: impl Into<String>,
255 field_type: Type,
256 required: bool,
257 ) -> Self {
258 self.fields.push(Field {
259 id,
260 name: name.into(),
261 required,
262 field_type,
263 doc: None,
264 default: None,
265 });
266 self
267 }
268
269 pub fn with_field_full(mut self, field: Field) -> Self {
271 self.fields.push(field);
272 self
273 }
274
275 pub fn with_identifier_fields(mut self, field_ids: impl IntoIterator<Item = i32>) -> Self {
277 self.identifier_field_ids.extend(field_ids);
278 self
279 }
280
281 pub fn build(self) -> Schema {
283 Schema {
284 schema_id: self.schema_id,
285 fields: self.fields,
286 identifier_field_ids: self.identifier_field_ids,
287 }
288 }
289}
290
291impl Schema {
292 pub fn to_arrow_schema(&self) -> arrow::datatypes::Schema {
294 let arrow_fields: Vec<arrow::datatypes::Field> =
295 self.fields.iter().map(|f| f.to_arrow_field()).collect();
296 arrow::datatypes::Schema::new(arrow_fields)
297 }
298
299 pub fn to_arrow_schema_ref(&self) -> arrow::datatypes::SchemaRef {
301 std::sync::Arc::new(self.to_arrow_schema())
302 }
303
304 pub fn to_df_schema(&self) -> datafusion::error::Result<datafusion::common::DFSchema> {
306 datafusion::common::DFSchema::try_from(self.to_arrow_schema())
307 }
308 pub fn builder(schema_id: i32) -> SchemaBuilder {
310 SchemaBuilder::new(schema_id)
311 }
312
313 pub fn find_field(&self, field_id: i32) -> Option<&Field> {
315 self.find_field_in_fields(&self.fields, field_id)
316 }
317
318 pub fn find_field_by_name(&self, name: &str) -> Option<&Field> {
320 self.fields.iter().find(|f| f.name == name)
321 }
322
323 pub fn highest_field_id(&self) -> i32 {
325 self.highest_field_id_in(&self.fields)
326 }
327
328 fn find_field_in_fields<'a>(&self, fields: &'a [Field], field_id: i32) -> Option<&'a Field> {
330 for field in fields {
331 if field.id == field_id {
332 return Some(field);
333 }
334 if let Some(nested) = self.find_in_nested_type(&field.field_type, field_id) {
336 return Some(nested);
337 }
338 }
339 None
340 }
341
342 fn find_in_nested_type<'a>(&self, field_type: &'a Type, field_id: i32) -> Option<&'a Field> {
344 match field_type {
345 Type::Struct { fields } => self.find_field_in_fields(fields, field_id),
346 Type::List { element } => {
347 if element.id == field_id {
348 Some(element.as_ref())
349 } else {
350 self.find_in_nested_type(&element.field_type, field_id)
351 }
352 }
353 Type::Map { key, value } => {
354 if key.id == field_id {
355 Some(key.as_ref())
356 } else if value.id == field_id {
357 Some(value.as_ref())
358 } else {
359 self.find_in_nested_type(&key.field_type, field_id)
360 .or_else(|| self.find_in_nested_type(&value.field_type, field_id))
361 }
362 }
363 _ => None,
364 }
365 }
366
367 fn highest_field_id_in(&self, fields: &[Field]) -> i32 {
369 let mut max_id = 0;
370 for field in fields {
371 max_id = max_id.max(field.id);
372 max_id = max_id.max(self.highest_in_type(&field.field_type));
373 }
374 max_id
375 }
376
377 fn highest_in_type(&self, field_type: &Type) -> i32 {
378 match field_type {
379 Type::Struct { fields } => self.highest_field_id_in(fields),
380 Type::List { element } => element.id.max(self.highest_in_type(&element.field_type)),
381 Type::Map { key, value } => key
382 .id
383 .max(value.id)
384 .max(self.highest_in_type(&key.field_type))
385 .max(self.highest_in_type(&value.field_type)),
386 _ => 0,
387 }
388 }
389}
390
391impl Type {
392 pub fn can_widen_to(&self, target: &Type) -> bool {
398 matches!(
399 (self, target),
400 (Type::Int, Type::Long) | (Type::Float, Type::Double)
401 )
402 }
403
404 pub fn is_primitive(&self) -> bool {
406 !matches!(
407 self,
408 Type::Struct { .. } | Type::List { .. } | Type::Map { .. }
409 )
410 }
411
412 pub fn is_nested(&self) -> bool {
414 !self.is_primitive()
415 }
416}
417
418pub struct SchemaUpdate {
420 changes: Vec<SchemaChange>,
421}
422
423#[derive(Debug)]
424enum SchemaChange {
425 AddColumn {
426 parent_id: Option<i32>,
427 name: String,
428 field_type: Type,
429 required: bool,
430 doc: Option<String>,
431 },
432 DeleteColumn {
433 field_id: i32,
434 },
435 RenameColumn {
436 field_id: i32,
437 new_name: String,
438 },
439 UpdateType {
440 field_id: i32,
441 new_type: Type,
442 },
443 MakeOptional {
444 field_id: i32,
445 },
446}
447
448impl SchemaUpdate {
449 pub fn new() -> Self {
450 Self {
451 changes: Vec::new(),
452 }
453 }
454
455 pub fn add_column(
457 mut self,
458 parent_id: Option<i32>,
459 name: impl Into<String>,
460 field_type: Type,
461 doc: Option<String>,
462 ) -> Self {
463 self.changes.push(SchemaChange::AddColumn {
464 parent_id,
465 name: name.into(),
466 field_type,
467 required: false, doc,
469 });
470 self
471 }
472
473 pub fn delete_column(mut self, field_id: i32) -> Self {
475 self.changes.push(SchemaChange::DeleteColumn { field_id });
476 self
477 }
478
479 pub fn rename_column(mut self, field_id: i32, new_name: impl Into<String>) -> Self {
481 self.changes.push(SchemaChange::RenameColumn {
482 field_id,
483 new_name: new_name.into(),
484 });
485 self
486 }
487
488 pub fn update_type(mut self, field_id: i32, new_type: Type) -> Self {
490 self.changes
491 .push(SchemaChange::UpdateType { field_id, new_type });
492 self
493 }
494
495 pub fn make_optional(mut self, field_id: i32) -> Self {
497 self.changes.push(SchemaChange::MakeOptional { field_id });
498 self
499 }
500
501 pub fn apply(
509 self,
510 base_schema: &Schema,
511 new_schema_id: i32,
512 next_column_id: &mut i32,
513 ) -> Result<Schema, String> {
514 let mut fields = base_schema.fields.clone();
515
516 for change in self.changes {
517 match change {
518 SchemaChange::AddColumn {
519 parent_id,
520 name,
521 field_type,
522 required,
523 doc,
524 } => {
525 let id = *next_column_id;
526 *next_column_id += 1; let new_field = Field {
532 id,
533 name,
534 required,
535 field_type,
536 doc,
537 default: None,
538 };
539
540 if let Some(pid) = parent_id {
541 Self::add_field_recursive(&mut fields, pid, new_field)?;
543 } else {
544 fields.push(new_field);
546 }
547 }
548 SchemaChange::DeleteColumn { field_id } => {
549 Self::delete_field_recursive(&mut fields, field_id);
550 }
551 SchemaChange::RenameColumn { field_id, new_name } => {
552 Self::rename_field_recursive(&mut fields, field_id, &new_name)?;
553 }
554 SchemaChange::UpdateType { field_id, new_type } => {
555 Self::update_type_recursive(&mut fields, field_id, new_type)?;
556 }
557 SchemaChange::MakeOptional { field_id } => {
558 Self::make_optional_recursive(&mut fields, field_id)?;
559 }
560 }
561 }
562
563 Ok(Schema {
564 schema_id: new_schema_id,
565 fields,
566 identifier_field_ids: base_schema.identifier_field_ids.clone(),
567 })
568 }
569
570 fn add_field_recursive(
571 fields: &mut Vec<Field>,
572 parent_id: i32,
573 new_field: Field,
574 ) -> Result<(), String> {
575 for field in fields {
576 if field.id == parent_id {
577 match &mut field.field_type {
578 Type::Struct { fields: children } => {
579 children.push(new_field);
580 return Ok(());
581 }
582 _ => return Err(format!("Parent field {} is not a struct", parent_id)),
583 }
584 }
585 if let Type::Struct { fields: children } = &mut field.field_type {
587 if Self::add_field_recursive(children, parent_id, new_field.clone()).is_ok() {
588 return Ok(());
589 }
590 }
591 }
592 Err(format!("Parent field {} not found", parent_id))
593 }
594
595 fn delete_field_recursive(fields: &mut Vec<Field>, target_id: i32) {
596 fields.retain(|f| f.id != target_id);
597 for field in fields {
598 if let Type::Struct { fields: children } = &mut field.field_type {
599 Self::delete_field_recursive(children, target_id);
600 }
601 }
602 }
603
604 fn rename_field_recursive(
605 fields: &mut [Field],
606 target_id: i32,
607 new_name: &str,
608 ) -> Result<(), String> {
609 for field in fields {
610 if field.id == target_id {
611 field.name = new_name.to_string();
612 return Ok(());
613 }
614 if let Type::Struct { fields: children } = &mut field.field_type {
615 if Self::rename_field_recursive(children, target_id, new_name).is_ok() {
616 return Ok(());
617 }
618 }
619 }
620 Err(format!("Field {} not found", target_id))
621 }
622
623 fn update_type_recursive(
624 fields: &mut [Field],
625 target_id: i32,
626 new_type: Type,
627 ) -> Result<(), String> {
628 for field in fields {
629 if field.id == target_id {
630 if !field.field_type.can_widen_to(&new_type) {
631 return Err(format!(
632 "Cannot change type {:?} to {:?} for base column {}",
633 field.field_type, new_type, target_id
634 ));
635 }
636 field.field_type = new_type;
637 return Ok(());
638 }
639 if let Type::Struct { fields: children } = &mut field.field_type {
640 if Self::update_type_recursive(children, target_id, new_type.clone()).is_ok() {
641 return Ok(());
642 }
643 }
644 }
645 Err(format!("Field {} not found", target_id))
646 }
647
648 fn make_optional_recursive(fields: &mut [Field], target_id: i32) -> Result<(), String> {
649 for field in fields {
650 if field.id == target_id {
651 field.required = false;
652 return Ok(());
653 }
654 if let Type::Struct { fields: children } = &mut field.field_type {
655 if Self::make_optional_recursive(children, target_id).is_ok() {
656 return Ok(());
657 }
658 }
659 }
660 Err(format!("Field {} not found", target_id))
661 }
662}
663
664impl Default for SchemaUpdate {
665 fn default() -> Self {
666 Self::new()
667 }
668}
669
670#[cfg(test)]
671mod tests {
672 use super::*;
673
674 #[test]
675 fn test_schema_builder() {
676 let schema = Schema::builder(0)
677 .with_field(1, "id", Type::Long, true)
678 .with_field(2, "name", Type::String, false)
679 .build();
680
681 assert_eq!(schema.schema_id, 0);
682 assert_eq!(schema.fields.len(), 2);
683 assert_eq!(schema.fields[0].name, "id");
684 assert!(schema.fields[0].required);
685 }
686
687 #[test]
688 fn test_find_field() {
689 let schema = Schema::builder(0)
690 .with_field(1, "id", Type::Long, true)
691 .with_field(2, "name", Type::String, false)
692 .build();
693
694 let field = schema.find_field(2).unwrap();
695 assert_eq!(field.name, "name");
696 }
697
698 #[test]
699 fn test_type_widening() {
700 assert!(Type::Int.can_widen_to(&Type::Long));
701 assert!(Type::Float.can_widen_to(&Type::Double));
702 assert!(!Type::Long.can_widen_to(&Type::Int));
703 assert!(!Type::String.can_widen_to(&Type::Int));
704 }
705
706 #[test]
707 fn test_nested_struct() {
708 let address_fields = vec![
709 Field {
710 id: 10,
711 name: "street".into(),
712 required: true,
713 field_type: Type::String,
714 doc: None,
715 default: None,
716 },
717 Field {
718 id: 11,
719 name: "city".into(),
720 required: true,
721 field_type: Type::String,
722 doc: None,
723 default: None,
724 },
725 ];
726
727 let schema = Schema::builder(0)
728 .with_field(1, "id", Type::Long, true)
729 .with_field_full(Field {
730 id: 2,
731 name: "address".into(),
732 required: false,
733 field_type: Type::Struct {
734 fields: address_fields,
735 },
736 doc: None,
737 default: None,
738 })
739 .build();
740
741 let street_field = schema.find_field(10).unwrap();
743 assert_eq!(street_field.name, "street");
744
745 assert_eq!(schema.highest_field_id(), 11);
747 }
748
749 #[test]
750 fn test_serialization() {
751 let schema = Schema::builder(0)
752 .with_field(1, "id", Type::Long, true)
753 .with_field(
754 2,
755 "amount",
756 Type::Decimal {
757 precision: 10,
758 scale: 2,
759 },
760 false,
761 )
762 .build();
763
764 let json = serde_json::to_string_pretty(&schema).unwrap();
765 let deserialized: Schema = serde_json::from_str(&json).unwrap();
766
767 assert_eq!(schema, deserialized);
768 }
769
770 #[test]
771 fn test_schema_evolution() {
772 let schema = Schema::builder(0)
773 .with_field(1, "id", Type::Long, true)
774 .with_field(2, "data", Type::String, false)
775 .build();
776
777 let mut next_col_id = 3;
778
779 let update = SchemaUpdate::new().add_column(None, "new_col", Type::Boolean, None);
781
782 let new_schema = update.apply(&schema, 1, &mut next_col_id).unwrap();
783
784 assert_eq!(new_schema.fields.len(), 3);
785 assert_eq!(new_schema.fields[2].name, "new_col");
786 assert_eq!(new_schema.fields[2].id, 3);
787 assert_eq!(next_col_id, 4);
788
789 let update = SchemaUpdate::new().rename_column(2, "renamed_data");
791
792 let final_schema = update.apply(&new_schema, 2, &mut next_col_id).unwrap();
793
794 assert_eq!(final_schema.fields[1].name, "renamed_data");
795 assert_eq!(final_schema.fields[1].id, 2);
796 }
797}