Skip to main content

supertable_core/
schema.rs

1//! # SuperTable Schema
2//!
3//! This module defines the schema representation for SuperTable, following
4//! Iceberg's approach of using immutable field IDs that enable safe schema evolution.
5//!
6//! ## Design Principles
7//!
8//! 1. **Immutable Field IDs**: Each field has a unique ID that never changes,
9//!    even if the field is renamed. This enables safe schema evolution.
10//!
11//! 2. **Nested Types**: Full support for complex nested structures including
12//!    structs, lists, and maps.
13//!
14//! 3. **Rich Metadata**: Optional documentation and default values for fields.
15//!
16//! ## Schema Evolution
17//!
18//! SuperTable supports the following schema changes:
19//! - Adding new optional columns
20//! - Dropping columns (soft delete - ID is preserved)
21//! - Renaming columns (ID stays the same)
22//! - Widening types (e.g., int -> long)
23//! - Making required columns optional
24
25use serde::{Deserialize, Serialize};
26
27/// A schema defines the structure of records in a table.
28///
29/// Schemas are immutable once created. Schema evolution is achieved by
30/// creating new schemas and updating the table's current schema reference.
31///
32/// # Example
33///
34/// ```rust
35/// use supercore::schema::{Schema, Field, Type};
36///
37/// let schema = Schema::builder(0)
38///     .with_field(1, "id", Type::Long, true)
39///     .with_field(2, "name", Type::String, true)
40///     .with_field(3, "email", Type::String, false)
41///     .build();
42/// ```
43#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
44#[serde(rename_all = "kebab-case")]
45pub struct Schema {
46    /// Unique identifier for this schema version.
47    pub schema_id: i32,
48
49    /// The list of fields in this schema.
50    #[serde(default)]
51    pub fields: Vec<Field>,
52
53    /// Optional identifier field IDs (for tables with primary keys).
54    #[serde(default, skip_serializing_if = "Vec::is_empty")]
55    pub identifier_field_ids: Vec<i32>,
56}
57
58/// A field in a schema.
59///
60/// Fields are identified by their `id`, which is immutable and unique within
61/// a table's history. This enables safe schema evolution where fields can be
62/// renamed without breaking existing data files.
63#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
64#[serde(rename_all = "kebab-case")]
65pub struct Field {
66    /// Unique, immutable identifier for this field.
67    /// IDs are never reused, even for dropped fields.
68    pub id: i32,
69
70    /// The field name. Can be changed during schema evolution.
71    pub name: String,
72
73    /// Whether the field is required (non-nullable).
74    pub required: bool,
75
76    /// The data type of this field.
77    #[serde(rename = "type")]
78    pub field_type: Type,
79
80    /// Optional documentation for this field.
81    #[serde(skip_serializing_if = "Option::is_none")]
82    pub doc: Option<String>,
83
84    /// Optional default value (JSON-encoded).
85    #[serde(skip_serializing_if = "Option::is_none")]
86    pub default: Option<serde_json::Value>,
87}
88
89impl Field {
90    /// Converts this field to an Arrow field.
91    pub fn to_arrow_field(&self) -> arrow::datatypes::Field {
92        arrow::datatypes::Field::new(
93            &self.name,
94            self.field_type.to_arrow_datatype(),
95            !self.required,
96        )
97    }
98}
99
100/// Data types supported by SuperTable.
101///
102/// These types are designed to be compatible with Apache Arrow and Parquet,
103/// enabling zero-copy data access and efficient storage.
104#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
105#[serde(tag = "type", rename_all = "lowercase")]
106pub enum Type {
107    /// A boolean value (true/false).
108    Boolean,
109
110    /// A 32-bit signed integer.
111    Int,
112
113    /// A 64-bit signed integer.
114    Long,
115
116    /// A 32-bit IEEE 754 floating point number.
117    Float,
118
119    /// A 64-bit IEEE 754 floating point number.
120    Double,
121
122    /// A date without time (days since 1970-01-01).
123    Date,
124
125    /// A time without date (microseconds since midnight).
126    Time,
127
128    /// A timestamp with microsecond precision.
129    #[serde(rename_all = "kebab-case")]
130    Timestamp {
131        /// Whether the timestamp includes timezone information.
132        with_timezone: bool,
133    },
134
135    /// An arbitrary-length Unicode string.
136    String,
137
138    /// A universally unique identifier (128-bit).
139    Uuid,
140
141    /// Arbitrary binary data.
142    Binary,
143
144    /// A fixed-length binary value.
145    #[serde(rename_all = "kebab-case")]
146    Fixed {
147        /// The length in bytes.
148        length: u32,
149    },
150
151    /// A fixed-precision decimal number.
152    #[serde(rename_all = "kebab-case")]
153    Decimal {
154        /// Total number of digits.
155        precision: u32,
156        /// Number of digits after the decimal point.
157        scale: u32,
158    },
159
160    /// A struct (nested record) type.
161    #[serde(rename_all = "kebab-case")]
162    Struct {
163        /// The fields within this struct.
164        fields: Vec<Field>,
165    },
166
167    /// A list (array) type.
168    #[serde(rename_all = "kebab-case")]
169    List {
170        /// The element field (has its own ID).
171        element: Box<Field>,
172    },
173
174    /// A map type with key-value pairs.
175    #[serde(rename_all = "kebab-case")]
176    Map {
177        /// The key field (has its own ID).
178        key: Box<Field>,
179        /// The value field (has its own ID).
180        value: Box<Field>,
181    },
182}
183
184impl Type {
185    /// Converts this type to an Arrow data type.
186    pub fn to_arrow_datatype(&self) -> arrow::datatypes::DataType {
187        match self {
188            Type::Boolean => arrow::datatypes::DataType::Boolean,
189            Type::Int => arrow::datatypes::DataType::Int32,
190            Type::Long => arrow::datatypes::DataType::Int64,
191            Type::Float => arrow::datatypes::DataType::Float32,
192            Type::Double => arrow::datatypes::DataType::Float64,
193            Type::Date => arrow::datatypes::DataType::Date32,
194            Type::Time => {
195                arrow::datatypes::DataType::Time64(arrow::datatypes::TimeUnit::Microsecond)
196            }
197            Type::Timestamp { with_timezone } => {
198                let tz = if *with_timezone {
199                    Some("UTC".into())
200                } else {
201                    None
202                };
203                arrow::datatypes::DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, tz)
204            }
205            Type::String => arrow::datatypes::DataType::Utf8,
206            Type::Uuid => arrow::datatypes::DataType::FixedSizeBinary(16),
207            Type::Binary => arrow::datatypes::DataType::Binary,
208            Type::Fixed { length } => arrow::datatypes::DataType::FixedSizeBinary(*length as i32),
209            Type::Decimal { precision, scale } => {
210                arrow::datatypes::DataType::Decimal128(*precision as u8, *scale as i8)
211            }
212            Type::Struct { fields } => {
213                let arrow_fields = fields.iter().map(|f| f.to_arrow_field()).collect();
214                arrow::datatypes::DataType::Struct(arrow_fields)
215            }
216            Type::List { element } => {
217                arrow::datatypes::DataType::List(std::sync::Arc::new(element.to_arrow_field()))
218            }
219            Type::Map { key, value } => {
220                let entries = arrow::datatypes::Field::new(
221                    "entries",
222                    arrow::datatypes::DataType::Struct(
223                        vec![key.to_arrow_field(), value.to_arrow_field()].into(),
224                    ),
225                    false,
226                );
227                arrow::datatypes::DataType::Map(std::sync::Arc::new(entries), false)
228            }
229        }
230    }
231}
232
233/// Builder for constructing `Schema` instances.
234pub struct SchemaBuilder {
235    schema_id: i32,
236    fields: Vec<Field>,
237    identifier_field_ids: Vec<i32>,
238}
239
240impl SchemaBuilder {
241    /// Creates a new schema builder with the given schema ID.
242    pub fn new(schema_id: i32) -> Self {
243        Self {
244            schema_id,
245            fields: Vec::new(),
246            identifier_field_ids: Vec::new(),
247        }
248    }
249
250    /// Adds a simple field to the schema.
251    pub fn with_field(
252        mut self,
253        id: i32,
254        name: impl Into<String>,
255        field_type: Type,
256        required: bool,
257    ) -> Self {
258        self.fields.push(Field {
259            id,
260            name: name.into(),
261            required,
262            field_type,
263            doc: None,
264            default: None,
265        });
266        self
267    }
268
269    /// Adds a fully-specified field to the schema.
270    pub fn with_field_full(mut self, field: Field) -> Self {
271        self.fields.push(field);
272        self
273    }
274
275    /// Marks fields as identifier (primary key) fields.
276    pub fn with_identifier_fields(mut self, field_ids: impl IntoIterator<Item = i32>) -> Self {
277        self.identifier_field_ids.extend(field_ids);
278        self
279    }
280
281    /// Builds the schema.
282    pub fn build(self) -> Schema {
283        Schema {
284            schema_id: self.schema_id,
285            fields: self.fields,
286            identifier_field_ids: self.identifier_field_ids,
287        }
288    }
289}
290
291impl Schema {
292    /// Converts this schema to an Arrow schema.
293    pub fn to_arrow_schema(&self) -> arrow::datatypes::Schema {
294        let arrow_fields: Vec<arrow::datatypes::Field> =
295            self.fields.iter().map(|f| f.to_arrow_field()).collect();
296        arrow::datatypes::Schema::new(arrow_fields)
297    }
298
299    /// Converts this schema to an Arrow SchemaRef.
300    pub fn to_arrow_schema_ref(&self) -> arrow::datatypes::SchemaRef {
301        std::sync::Arc::new(self.to_arrow_schema())
302    }
303
304    /// Converts this schema to a DataFusion DFSchema.
305    pub fn to_df_schema(&self) -> datafusion::error::Result<datafusion::common::DFSchema> {
306        datafusion::common::DFSchema::try_from(self.to_arrow_schema())
307    }
308    /// Creates a new schema builder.
309    pub fn builder(schema_id: i32) -> SchemaBuilder {
310        SchemaBuilder::new(schema_id)
311    }
312
313    /// Finds a field by its ID.
314    pub fn find_field(&self, field_id: i32) -> Option<&Field> {
315        self.find_field_in_fields(&self.fields, field_id)
316    }
317
318    /// Finds a field by its name.
319    pub fn find_field_by_name(&self, name: &str) -> Option<&Field> {
320        self.fields.iter().find(|f| f.name == name)
321    }
322
323    /// Returns the highest field ID in this schema.
324    pub fn highest_field_id(&self) -> i32 {
325        self.highest_field_id_in(&self.fields)
326    }
327
328    /// Recursively finds a field by ID in a list of fields.
329    fn find_field_in_fields<'a>(&self, fields: &'a [Field], field_id: i32) -> Option<&'a Field> {
330        for field in fields {
331            if field.id == field_id {
332                return Some(field);
333            }
334            // Search nested types
335            if let Some(nested) = self.find_in_nested_type(&field.field_type, field_id) {
336                return Some(nested);
337            }
338        }
339        None
340    }
341
342    /// Searches for a field ID within nested types.
343    fn find_in_nested_type<'a>(&self, field_type: &'a Type, field_id: i32) -> Option<&'a Field> {
344        match field_type {
345            Type::Struct { fields } => self.find_field_in_fields(fields, field_id),
346            Type::List { element } => {
347                if element.id == field_id {
348                    Some(element.as_ref())
349                } else {
350                    self.find_in_nested_type(&element.field_type, field_id)
351                }
352            }
353            Type::Map { key, value } => {
354                if key.id == field_id {
355                    Some(key.as_ref())
356                } else if value.id == field_id {
357                    Some(value.as_ref())
358                } else {
359                    self.find_in_nested_type(&key.field_type, field_id)
360                        .or_else(|| self.find_in_nested_type(&value.field_type, field_id))
361                }
362            }
363            _ => None,
364        }
365    }
366
367    /// Recursively finds the highest field ID.
368    fn highest_field_id_in(&self, fields: &[Field]) -> i32 {
369        let mut max_id = 0;
370        for field in fields {
371            max_id = max_id.max(field.id);
372            max_id = max_id.max(self.highest_in_type(&field.field_type));
373        }
374        max_id
375    }
376
377    fn highest_in_type(&self, field_type: &Type) -> i32 {
378        match field_type {
379            Type::Struct { fields } => self.highest_field_id_in(fields),
380            Type::List { element } => element.id.max(self.highest_in_type(&element.field_type)),
381            Type::Map { key, value } => key
382                .id
383                .max(value.id)
384                .max(self.highest_in_type(&key.field_type))
385                .max(self.highest_in_type(&value.field_type)),
386            _ => 0,
387        }
388    }
389}
390
391impl Type {
392    /// Returns true if this type can be widened to the target type.
393    ///
394    /// Widening is allowed for:
395    /// - int → long
396    /// - float → double
397    pub fn can_widen_to(&self, target: &Type) -> bool {
398        matches!(
399            (self, target),
400            (Type::Int, Type::Long) | (Type::Float, Type::Double)
401        )
402    }
403
404    /// Returns true if this is a primitive (non-nested) type.
405    pub fn is_primitive(&self) -> bool {
406        !matches!(
407            self,
408            Type::Struct { .. } | Type::List { .. } | Type::Map { .. }
409        )
410    }
411
412    /// Returns true if this is a nested type.
413    pub fn is_nested(&self) -> bool {
414        !self.is_primitive()
415    }
416}
417
418/// A pending schema update that can be applied to a schema to produce a new version.
419pub struct SchemaUpdate {
420    changes: Vec<SchemaChange>,
421}
422
423#[derive(Debug)]
424enum SchemaChange {
425    AddColumn {
426        parent_id: Option<i32>,
427        name: String,
428        field_type: Type,
429        required: bool,
430        doc: Option<String>,
431    },
432    DeleteColumn {
433        field_id: i32,
434    },
435    RenameColumn {
436        field_id: i32,
437        new_name: String,
438    },
439    UpdateType {
440        field_id: i32,
441        new_type: Type,
442    },
443    MakeOptional {
444        field_id: i32,
445    },
446}
447
448impl SchemaUpdate {
449    pub fn new() -> Self {
450        Self {
451            changes: Vec::new(),
452        }
453    }
454
455    /// adds a new column to the schema.
456    pub fn add_column(
457        mut self,
458        parent_id: Option<i32>,
459        name: impl Into<String>,
460        field_type: Type,
461        doc: Option<String>,
462    ) -> Self {
463        self.changes.push(SchemaChange::AddColumn {
464            parent_id,
465            name: name.into(),
466            field_type,
467            required: false, // New columns must be optional for compatibility
468            doc,
469        });
470        self
471    }
472
473    /// Deletes a column from the schema.
474    pub fn delete_column(mut self, field_id: i32) -> Self {
475        self.changes.push(SchemaChange::DeleteColumn { field_id });
476        self
477    }
478
479    /// Renames a column.
480    pub fn rename_column(mut self, field_id: i32, new_name: impl Into<String>) -> Self {
481        self.changes.push(SchemaChange::RenameColumn {
482            field_id,
483            new_name: new_name.into(),
484        });
485        self
486    }
487
488    /// Updates a column's type (must be compatible).
489    pub fn update_type(mut self, field_id: i32, new_type: Type) -> Self {
490        self.changes
491            .push(SchemaChange::UpdateType { field_id, new_type });
492        self
493    }
494
495    /// Makes a required column optional.
496    pub fn make_optional(mut self, field_id: i32) -> Self {
497        self.changes.push(SchemaChange::MakeOptional { field_id });
498        self
499    }
500
501    /// Applies the changes to a base schema, producing a new schema.
502    ///
503    /// # Arguments
504    ///
505    /// * `base_schema` - The schema to start from
506    /// * `new_schema_id` - The ID for the new schema
507    /// * `next_column_id` - A mutable reference to the next available column ID setter
508    pub fn apply(
509        self,
510        base_schema: &Schema,
511        new_schema_id: i32,
512        next_column_id: &mut i32,
513    ) -> Result<Schema, String> {
514        let mut fields = base_schema.fields.clone();
515
516        for change in self.changes {
517            match change {
518                SchemaChange::AddColumn {
519                    parent_id,
520                    name,
521                    field_type,
522                    required,
523                    doc,
524                } => {
525                    let id = *next_column_id;
526                    *next_column_id += 1; // Increment for next use, including nested fields if any
527
528                    // Note: For complex types we'd need to assign IDs to children too.
529                    // Simplified here assuming primitive types for now.
530
531                    let new_field = Field {
532                        id,
533                        name,
534                        required,
535                        field_type,
536                        doc,
537                        default: None,
538                    };
539
540                    if let Some(pid) = parent_id {
541                        // Find parent and add to its children
542                        Self::add_field_recursive(&mut fields, pid, new_field)?;
543                    } else {
544                        // Add to root
545                        fields.push(new_field);
546                    }
547                }
548                SchemaChange::DeleteColumn { field_id } => {
549                    Self::delete_field_recursive(&mut fields, field_id);
550                }
551                SchemaChange::RenameColumn { field_id, new_name } => {
552                    Self::rename_field_recursive(&mut fields, field_id, &new_name)?;
553                }
554                SchemaChange::UpdateType { field_id, new_type } => {
555                    Self::update_type_recursive(&mut fields, field_id, new_type)?;
556                }
557                SchemaChange::MakeOptional { field_id } => {
558                    Self::make_optional_recursive(&mut fields, field_id)?;
559                }
560            }
561        }
562
563        Ok(Schema {
564            schema_id: new_schema_id,
565            fields,
566            identifier_field_ids: base_schema.identifier_field_ids.clone(),
567        })
568    }
569
570    fn add_field_recursive(
571        fields: &mut Vec<Field>,
572        parent_id: i32,
573        new_field: Field,
574    ) -> Result<(), String> {
575        for field in fields {
576            if field.id == parent_id {
577                match &mut field.field_type {
578                    Type::Struct { fields: children } => {
579                        children.push(new_field);
580                        return Ok(());
581                    }
582                    _ => return Err(format!("Parent field {} is not a struct", parent_id)),
583                }
584            }
585            // Recurse into struct fields
586            if let Type::Struct { fields: children } = &mut field.field_type {
587                if Self::add_field_recursive(children, parent_id, new_field.clone()).is_ok() {
588                    return Ok(());
589                }
590            }
591        }
592        Err(format!("Parent field {} not found", parent_id))
593    }
594
595    fn delete_field_recursive(fields: &mut Vec<Field>, target_id: i32) {
596        fields.retain(|f| f.id != target_id);
597        for field in fields {
598            if let Type::Struct { fields: children } = &mut field.field_type {
599                Self::delete_field_recursive(children, target_id);
600            }
601        }
602    }
603
604    fn rename_field_recursive(
605        fields: &mut [Field],
606        target_id: i32,
607        new_name: &str,
608    ) -> Result<(), String> {
609        for field in fields {
610            if field.id == target_id {
611                field.name = new_name.to_string();
612                return Ok(());
613            }
614            if let Type::Struct { fields: children } = &mut field.field_type {
615                if Self::rename_field_recursive(children, target_id, new_name).is_ok() {
616                    return Ok(());
617                }
618            }
619        }
620        Err(format!("Field {} not found", target_id))
621    }
622
623    fn update_type_recursive(
624        fields: &mut [Field],
625        target_id: i32,
626        new_type: Type,
627    ) -> Result<(), String> {
628        for field in fields {
629            if field.id == target_id {
630                if !field.field_type.can_widen_to(&new_type) {
631                    return Err(format!(
632                        "Cannot change type {:?} to {:?} for base column {}",
633                        field.field_type, new_type, target_id
634                    ));
635                }
636                field.field_type = new_type;
637                return Ok(());
638            }
639            if let Type::Struct { fields: children } = &mut field.field_type {
640                if Self::update_type_recursive(children, target_id, new_type.clone()).is_ok() {
641                    return Ok(());
642                }
643            }
644        }
645        Err(format!("Field {} not found", target_id))
646    }
647
648    fn make_optional_recursive(fields: &mut [Field], target_id: i32) -> Result<(), String> {
649        for field in fields {
650            if field.id == target_id {
651                field.required = false;
652                return Ok(());
653            }
654            if let Type::Struct { fields: children } = &mut field.field_type {
655                if Self::make_optional_recursive(children, target_id).is_ok() {
656                    return Ok(());
657                }
658            }
659        }
660        Err(format!("Field {} not found", target_id))
661    }
662}
663
664impl Default for SchemaUpdate {
665    fn default() -> Self {
666        Self::new()
667    }
668}
669
670#[cfg(test)]
671mod tests {
672    use super::*;
673
674    #[test]
675    fn test_schema_builder() {
676        let schema = Schema::builder(0)
677            .with_field(1, "id", Type::Long, true)
678            .with_field(2, "name", Type::String, false)
679            .build();
680
681        assert_eq!(schema.schema_id, 0);
682        assert_eq!(schema.fields.len(), 2);
683        assert_eq!(schema.fields[0].name, "id");
684        assert!(schema.fields[0].required);
685    }
686
687    #[test]
688    fn test_find_field() {
689        let schema = Schema::builder(0)
690            .with_field(1, "id", Type::Long, true)
691            .with_field(2, "name", Type::String, false)
692            .build();
693
694        let field = schema.find_field(2).unwrap();
695        assert_eq!(field.name, "name");
696    }
697
698    #[test]
699    fn test_type_widening() {
700        assert!(Type::Int.can_widen_to(&Type::Long));
701        assert!(Type::Float.can_widen_to(&Type::Double));
702        assert!(!Type::Long.can_widen_to(&Type::Int));
703        assert!(!Type::String.can_widen_to(&Type::Int));
704    }
705
706    #[test]
707    fn test_nested_struct() {
708        let address_fields = vec![
709            Field {
710                id: 10,
711                name: "street".into(),
712                required: true,
713                field_type: Type::String,
714                doc: None,
715                default: None,
716            },
717            Field {
718                id: 11,
719                name: "city".into(),
720                required: true,
721                field_type: Type::String,
722                doc: None,
723                default: None,
724            },
725        ];
726
727        let schema = Schema::builder(0)
728            .with_field(1, "id", Type::Long, true)
729            .with_field_full(Field {
730                id: 2,
731                name: "address".into(),
732                required: false,
733                field_type: Type::Struct {
734                    fields: address_fields,
735                },
736                doc: None,
737                default: None,
738            })
739            .build();
740
741        // Should find nested field
742        let street_field = schema.find_field(10).unwrap();
743        assert_eq!(street_field.name, "street");
744
745        // Highest ID should be 11
746        assert_eq!(schema.highest_field_id(), 11);
747    }
748
749    #[test]
750    fn test_serialization() {
751        let schema = Schema::builder(0)
752            .with_field(1, "id", Type::Long, true)
753            .with_field(
754                2,
755                "amount",
756                Type::Decimal {
757                    precision: 10,
758                    scale: 2,
759                },
760                false,
761            )
762            .build();
763
764        let json = serde_json::to_string_pretty(&schema).unwrap();
765        let deserialized: Schema = serde_json::from_str(&json).unwrap();
766
767        assert_eq!(schema, deserialized);
768    }
769
770    #[test]
771    fn test_schema_evolution() {
772        let schema = Schema::builder(0)
773            .with_field(1, "id", Type::Long, true)
774            .with_field(2, "data", Type::String, false)
775            .build();
776
777        let mut next_col_id = 3;
778
779        // Add a column
780        let update = SchemaUpdate::new().add_column(None, "new_col", Type::Boolean, None);
781
782        let new_schema = update.apply(&schema, 1, &mut next_col_id).unwrap();
783
784        assert_eq!(new_schema.fields.len(), 3);
785        assert_eq!(new_schema.fields[2].name, "new_col");
786        assert_eq!(new_schema.fields[2].id, 3);
787        assert_eq!(next_col_id, 4);
788
789        // Rename a column
790        let update = SchemaUpdate::new().rename_column(2, "renamed_data");
791
792        let final_schema = update.apply(&new_schema, 2, &mut next_col_id).unwrap();
793
794        assert_eq!(final_schema.fields[1].name, "renamed_data");
795        assert_eq!(final_schema.fields[1].id, 2);
796    }
797}