Skip to main content

buoyant_kernel/schema/
mod.rs

1//! Definitions and functions to create and manipulate kernel schema
2
3use std::borrow::Cow;
4use std::collections::{HashMap, HashSet};
5use std::fmt::{Debug, Display, Formatter};
6use std::iter::{DoubleEndedIterator, FusedIterator};
7use std::str::FromStr;
8use std::sync::{Arc, LazyLock};
9
10use delta_kernel_derive::internal_api;
11use indexmap::IndexMap;
12use itertools::Itertools;
13use serde::{Deserialize, Serialize};
14use tracing::warn;
15
16// re-export because many call sites that use schemas do not necessarily use expressions
17pub(crate) use crate::expressions::{column_name, ColumnName};
18use crate::reserved_field_ids::FILE_NAME;
19use crate::table_features::{
20    validate_and_extract_column_mapping_annotations, validate_column_mapping_id, ColumnMappingMode,
21    SeenColumnMappingAnnotations,
22};
23use crate::transforms::{transform_output_type, SchemaTransform};
24use crate::utils::require;
25use crate::{DeltaResult, Error};
26
27pub(crate) mod compare;
28#[cfg(feature = "schema-diff")]
29pub(crate) mod diff;
30
31#[cfg(feature = "internal-api")]
32pub mod derive_macro_utils;
33#[cfg(not(feature = "internal-api"))]
34pub(crate) mod derive_macro_utils;
35pub(crate) mod validation;
36pub(crate) mod variant_utils;
37
38pub type Schema = StructType;
39pub type SchemaRef = Arc<StructType>;
40
41/// Converts a type to a [`Schema`] that represents that type. Derivable for struct types using the
42/// [`delta_kernel_derive::ToSchema`] derive macro.
43#[internal_api]
44pub(crate) trait ToSchema {
45    fn to_schema() -> StructType;
46}
47
48#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Eq)]
49#[serde(untagged)]
50pub enum MetadataValue {
51    Number(i64),
52    String(String),
53    Boolean(bool),
54    // The [PROTOCOL](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#struct-field) states
55    // only that the metadata is "A JSON map containing information about this column.", so we can
56    // actually have any valid json here. `Other` is therefore a catchall for things we don't need
57    // to handle.
58    Other(serde_json::Value),
59}
60
61impl Display for MetadataValue {
62    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
63        match self {
64            MetadataValue::Number(n) => write!(f, "{n}"),
65            MetadataValue::String(s) => write!(f, "{s}"),
66            MetadataValue::Boolean(b) => write!(f, "{b}"),
67            MetadataValue::Other(v) => write!(f, "{v}"), // just write the json back
68        }
69    }
70}
71
72impl From<String> for MetadataValue {
73    fn from(value: String) -> Self {
74        Self::String(value)
75    }
76}
77
78impl From<&String> for MetadataValue {
79    fn from(value: &String) -> Self {
80        Self::String(value.clone())
81    }
82}
83
84impl From<&str> for MetadataValue {
85    fn from(value: &str) -> Self {
86        Self::String(value.to_string())
87    }
88}
89
90impl From<i64> for MetadataValue {
91    fn from(value: i64) -> Self {
92        Self::Number(value)
93    }
94}
95
96impl From<bool> for MetadataValue {
97    fn from(value: bool) -> Self {
98        Self::Boolean(value)
99    }
100}
101
102#[derive(Debug)]
103pub enum ColumnMetadataKey {
104    ColumnMappingId,
105    ColumnMappingPhysicalName,
106    /// Parquet field IDs for the synthesized `element` / `key` / `value` fields of an Array or
107    /// Map. Stored on the *nearest ancestor* StructField as a JSON object whose keys are
108    /// dot-paths rooted at that field's name.
109    ///
110    /// # Example: list-in-map
111    ///
112    /// For `m: map<int, array<int>>` and the key/value/element fields having field ids
113    /// 100/101/102, the metadata on `m` should be:
114    ///
115    /// ```json
116    /// {
117    ///   "delta.columnMapping.nested.ids": {
118    ///     "m.key":           100,
119    ///     "m.value":         101,
120    ///     "m.value.element": 102
121    ///   }
122    /// }
123    /// ```
124    ColumnMappingNestedIds,
125    ParquetFieldId,
126    ParquetFieldNestedIds,
127    GenerationExpression,
128    IdentityStart,
129    IdentityStep,
130    IdentityHighWaterMark,
131    IdentityAllowExplicitInsert,
132    InternalColumn,
133    Invariants,
134    MetadataSpec,
135}
136
137impl AsRef<str> for ColumnMetadataKey {
138    fn as_ref(&self) -> &str {
139        match self {
140            Self::ColumnMappingId => "delta.columnMapping.id",
141            Self::ColumnMappingPhysicalName => "delta.columnMapping.physicalName",
142            Self::ColumnMappingNestedIds => "delta.columnMapping.nested.ids",
143            // "parquet.field.id" is not defined by the Delta protocol, but follows the convention
144            // established by delta-spark and other Delta ecosystem implementations for storing
145            // Parquet field IDs in StructField metadata.
146            Self::ParquetFieldId => "parquet.field.id",
147            // The Delta protocol defines this key for IcebergCompatV2/V3 nested field ids. It is
148            // legacy and will be replaced by `delta.columnMapping.nested.ids` (which kernel
149            // uses everywhere). Kept here for protocol compatibility only.
150            // Tracking issue: <https://github.com/delta-io/delta/issues/6688>
151            Self::ParquetFieldNestedIds => "parquet.field.nested.ids",
152            Self::GenerationExpression => "delta.generationExpression",
153            Self::IdentityAllowExplicitInsert => "delta.identity.allowExplicitInsert",
154            Self::IdentityHighWaterMark => "delta.identity.highWaterMark",
155            Self::IdentityStart => "delta.identity.start",
156            Self::IdentityStep => "delta.identity.step",
157            Self::InternalColumn => "delta.isInternalColumn",
158            Self::Invariants => "delta.invariants",
159            Self::MetadataSpec => "delta.metadataSpec",
160        }
161    }
162}
163
164/// Enumeration of metadata columns recognized by Delta Kernel.
165///
166/// Metadata columns provide additional information about rows in a Delta table.
167#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
168pub enum MetadataColumnSpec {
169    RowIndex,
170    RowId,
171    RowCommitVersion,
172    FilePath,
173}
174
175impl MetadataColumnSpec {
176    /// A human-readable name for the specified metadata column.
177    pub fn text_value(&self) -> &'static str {
178        match self {
179            Self::RowIndex => "row_index",
180            Self::RowId => "row_id",
181            Self::RowCommitVersion => "row_commit_version",
182            Self::FilePath => "_file",
183        }
184    }
185
186    /// The data type of the specified metadata column.
187    pub fn data_type(&self) -> DataType {
188        match self {
189            Self::RowIndex => DataType::LONG,
190            Self::RowId => DataType::LONG,
191            Self::RowCommitVersion => DataType::LONG,
192            Self::FilePath => DataType::STRING,
193        }
194    }
195
196    /// Whether the specified metadata column is nullable.
197    pub fn nullable(&self) -> bool {
198        match self {
199            Self::RowIndex => false,
200            Self::RowId => false,
201            Self::RowCommitVersion => false,
202            Self::FilePath => false,
203        }
204    }
205
206    /// The reserved field ID for the specified metadata column, if any.
207    pub fn reserved_field_id(&self) -> Option<i64> {
208        match self {
209            Self::FilePath => Some(FILE_NAME),
210            _ => None,
211        }
212    }
213}
214
215impl FromStr for MetadataColumnSpec {
216    type Err = Error;
217
218    fn from_str(s: &str) -> Result<Self, Self::Err> {
219        match s {
220            "row_index" => Ok(Self::RowIndex),
221            "row_id" => Ok(Self::RowId),
222            "row_commit_version" => Ok(Self::RowCommitVersion),
223            "_file" => Ok(Self::FilePath),
224            _ => Err(Error::Schema(format!("Unknown metadata column spec: {s}"))),
225        }
226    }
227}
228
229#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Eq)]
230pub struct StructField {
231    /// Name of this (possibly nested) column
232    pub name: String,
233    /// The data type of this field
234    #[serde(rename = "type")]
235    pub data_type: DataType,
236    /// Denotes whether this Field can be null
237    pub nullable: bool,
238    /// A JSON map containing information about this column
239    pub metadata: HashMap<String, MetadataValue>,
240}
241
242/// Parsed (and validated) pre-existing column-mapping annotations on a single field, as
243/// returned by [`StructField::validate_and_extract_existing_column_mapping_annotations`].
244/// Either, both, or neither of the two fields may be present; a field with neither set has no
245/// pre-populated column-mapping metadata.
246#[derive(Debug, Clone, Copy)]
247pub(crate) struct ExistingColumnMappingAnnotations<'a> {
248    /// Parsed `delta.columnMapping.id`, if present. Guaranteed non-negative.
249    pub id: Option<i64>,
250    /// Borrowed `delta.columnMapping.physicalName`, if present. Guaranteed non-empty.
251    pub physical_name: Option<&'a str>,
252}
253
254impl StructField {
255    /// The name of the default row index metadata column.
256    ///
257    /// Note that the dot does not indicate a nested field, it is just a separator for the metadata
258    /// column name.
259    const DEFAULT_ROW_INDEX_COLUMN_NAME: &'static str = "_metadata.row_index";
260
261    /////////////////
262    // Static methods
263    /////////////////
264
265    /// Creates a new field
266    pub fn new(name: impl Into<String>, data_type: impl Into<DataType>, nullable: bool) -> Self {
267        Self {
268            name: name.into(),
269            data_type: data_type.into(),
270            nullable,
271            metadata: HashMap::default(),
272        }
273    }
274
275    /// Creates a new nullable field
276    pub fn nullable(name: impl Into<String>, data_type: impl Into<DataType>) -> Self {
277        Self::new(name, data_type, true)
278    }
279
280    /// Creates a new non-nullable field
281    pub fn not_null(name: impl Into<String>, data_type: impl Into<DataType>) -> Self {
282        Self::new(name, data_type, false)
283    }
284
285    /// Creates a metadata column of the given spec with the given name.
286    pub fn create_metadata_column(name: impl Into<String>, spec: MetadataColumnSpec) -> Self {
287        let mut metadata = HashMap::new();
288        metadata.insert(
289            ColumnMetadataKey::MetadataSpec.as_ref().to_string(),
290            MetadataValue::String(spec.text_value().to_string()),
291        );
292
293        Self {
294            name: name.into(),
295            data_type: spec.data_type(),
296            nullable: spec.nullable(),
297            metadata,
298        }
299    }
300
301    /// Returns the default row index metadata column used by Kernel.
302    pub fn default_row_index_column() -> &'static StructField {
303        static DEFAULT_ROW_INDEX_COLUMN: LazyLock<StructField> = LazyLock::new(|| {
304            StructField::create_metadata_column(
305                StructField::DEFAULT_ROW_INDEX_COLUMN_NAME,
306                MetadataColumnSpec::RowIndex,
307            )
308        });
309        &DEFAULT_ROW_INDEX_COLUMN
310    }
311
312    ///////////////////
313    // Instance methods
314    ///////////////////
315
316    /// Replaces `self.metadata` with the list of <key, value> pairs in `metadata`.
317    pub fn with_metadata(
318        mut self,
319        metadata: impl IntoIterator<Item = (impl Into<String>, impl Into<MetadataValue>)>,
320    ) -> Self {
321        self.metadata = metadata
322            .into_iter()
323            .map(|(k, v)| (k.into(), v.into()))
324            .collect();
325        self
326    }
327
328    /// Extends `self.metadata` to include the <key, value> pairs in `metadata`.
329    pub fn add_metadata(
330        mut self,
331        metadata: impl IntoIterator<Item = (impl Into<String>, impl Into<MetadataValue>)>,
332    ) -> Self {
333        self.metadata
334            .extend(metadata.into_iter().map(|(k, v)| (k.into(), v.into())));
335        self
336    }
337
338    /// Returns true if this field is a metadata column.
339    pub fn is_metadata_column(&self) -> bool {
340        self.metadata
341            .contains_key(ColumnMetadataKey::MetadataSpec.as_ref())
342    }
343
344    /// Returns the metadata column spec if this is a metadata column, otherwise returns None.
345    pub fn get_metadata_column_spec(&self) -> Option<MetadataColumnSpec> {
346        match self
347            .metadata
348            .get(ColumnMetadataKey::MetadataSpec.as_ref())?
349        {
350            MetadataValue::String(s) => MetadataColumnSpec::from_str(s).ok(),
351            _ => None,
352        }
353    }
354
355    /// Returns true if this field is an internal column added by Kernel.
356    ///
357    /// Internal columns must be removed before returning scan results to the user.
358    pub fn is_internal_column(&self) -> bool {
359        matches!(
360            self.metadata
361                .get(ColumnMetadataKey::InternalColumn.as_ref()),
362            Some(MetadataValue::Boolean(true))
363        )
364    }
365
366    /// Marks this field as an internal column.
367    pub fn as_internal_column(self) -> Self {
368        self.add_metadata(vec![(
369            ColumnMetadataKey::InternalColumn.as_ref().to_string(),
370            MetadataValue::Boolean(true),
371        )])
372    }
373
374    pub fn get_config_value(&self, key: &ColumnMetadataKey) -> Option<&MetadataValue> {
375        self.metadata.get(key.as_ref())
376    }
377
378    /// Returns this field's `delta.columnMapping.id` annotation if present and well-formed.
379    /// Returns `None` if the annotation is missing or carries a non-numeric value.
380    pub fn column_mapping_id(&self) -> Option<i64> {
381        match self.get_config_value(&ColumnMetadataKey::ColumnMappingId)? {
382            MetadataValue::Number(n) => Some(*n),
383            _ => None,
384        }
385    }
386
387    /// Validates and extracts pre-existing column-mapping annotations on this field, returning
388    /// the parsed `id` and `physical_name` borrowed from the field's metadata. Returning the
389    /// parsed values lets the column-mapping assignment dispatch match on
390    /// `(Option<i64>, Option<&str>)` directly, which makes the dispatch total and obviates a
391    /// catch-all panic for malformed annotations the validator already rejects.
392    ///
393    /// Rejects:
394    /// - `delta.columnMapping.id` is present but not a `MetadataValue::Number`,
395    /// - `delta.columnMapping.id` is a `Number` but lies outside the protocol's 32-bit non-negative
396    ///   range (negative or `> i32::MAX`); see
397    ///   [`crate::table_features::validate_column_mapping_id`],
398    /// - `delta.columnMapping.physicalName` is present but not a `MetadataValue::String`,
399    /// - `delta.columnMapping.physicalName` is an empty `String`.
400    ///
401    /// Empty-name, negative-id, and over-`i32::MAX` id are stricter than delta-spark (which
402    /// accepts the first two and historically truncates the third); kernel fails fast at
403    /// write time so a connector that supplies bad metadata learns about it on the call that
404    /// produced it. Wrong-typed `id` errors take precedence over wrong-typed `physicalName`
405    /// errors (a connector that fixes the `id` and retries will then see the `physicalName`
406    /// error).
407    pub(crate) fn validate_and_extract_existing_column_mapping_annotations(
408        &self,
409    ) -> DeltaResult<ExistingColumnMappingAnnotations<'_>> {
410        let id = match self.get_config_value(&ColumnMetadataKey::ColumnMappingId) {
411            Some(MetadataValue::Number(n)) => {
412                validate_column_mapping_id(*n)
413                    .map_err(|e| Error::schema(format!("Field '{}': {e}", self.name)))?;
414                Some(*n)
415            }
416            None => None,
417            Some(_) => {
418                return Err(Error::schema(format!(
419                    "Field '{}' has a non-numeric `{}` annotation",
420                    self.name,
421                    ColumnMetadataKey::ColumnMappingId.as_ref(),
422                )));
423            }
424        };
425        let physical_name =
426            match self.get_config_value(&ColumnMetadataKey::ColumnMappingPhysicalName) {
427                Some(MetadataValue::String(s)) if s.is_empty() => {
428                    return Err(Error::schema(format!(
429                        "Field '{}' has an empty `{}` annotation",
430                        self.name,
431                        ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(),
432                    )));
433                }
434                Some(MetadataValue::String(s)) => Some(s.as_str()),
435                None => None,
436                Some(_) => {
437                    return Err(Error::schema(format!(
438                        "Field '{}' has a non-string `{}` annotation",
439                        self.name,
440                        ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(),
441                    )));
442                }
443            };
444        Ok(ExistingColumnMappingAnnotations { id, physical_name })
445    }
446
447    /// Recursively collects every `delta.columnMapping.id` reachable from this field --
448    /// the field's own ID plus any nested struct fields under Struct/Array/Map/Variant.
449    /// Test-only.
450    #[cfg(any(test, feature = "test-utils"))]
451    pub fn collect_column_mapping_ids(&self) -> Vec<i64> {
452        struct CollectIds(Vec<i64>);
453        impl<'a> SchemaTransform<'a> for CollectIds {
454            transform_output_type!(|'a, T| ());
455
456            fn transform_struct_field(&mut self, field: &'a StructField) {
457                if let Some(id) = field.column_mapping_id() {
458                    self.0.push(id);
459                }
460                self.recurse_into_struct_field(field)
461            }
462        }
463        let mut visitor = CollectIds(Vec::new());
464        visitor.transform_struct_field(self);
465        visitor.0
466    }
467
468    /// Get the physical name for this field as it should be read from parquet.
469    ///
470    /// When `column_mapping_mode` is `None`, always returns the logical name (even if physical
471    /// name metadata is present). When mode is `Id` or `Name`, returns the physical name from
472    /// metadata if present, otherwise returns the logical name.
473    ///
474    /// NOTE: Caller affirms that the schema was already validated by
475    /// [`crate::table_configuration::TableConfiguration::try_new`], to ensure that annotations are
476    /// always and only present when column mapping mode is enabled.
477    #[internal_api]
478    pub(crate) fn physical_name(&self, column_mapping_mode: ColumnMappingMode) -> &str {
479        match column_mapping_mode {
480            ColumnMappingMode::None => &self.name,
481            ColumnMappingMode::Id | ColumnMappingMode::Name => {
482                match self
483                    .metadata
484                    .get(ColumnMetadataKey::ColumnMappingPhysicalName.as_ref())
485                {
486                    Some(MetadataValue::String(physical_name)) => physical_name,
487                    _ => &self.name,
488                }
489            }
490        }
491    }
492
493    /// Returns true if this field has a physical name annotation
494    /// in its column mapping metadata.
495    pub(crate) fn has_physical_name_annotation(&self) -> bool {
496        matches!(
497            self.metadata
498                .get(ColumnMetadataKey::ColumnMappingPhysicalName.as_ref()),
499            Some(MetadataValue::String(_))
500        )
501    }
502
503    /// Returns true if this field has a column mapping ID annotation
504    /// in its column mapping metadata.
505    pub(crate) fn has_id_annotation(&self) -> bool {
506        matches!(
507            self.metadata
508                .get(ColumnMetadataKey::ColumnMappingId.as_ref()),
509            Some(MetadataValue::Number(_))
510        )
511    }
512
513    /// Change the name of a field. The field will preserve its data type and nullability. Note that
514    /// this allocates a new field.
515    pub fn with_name(&self, new_name: impl Into<String>) -> Self {
516        StructField {
517            name: new_name.into(),
518            data_type: self.data_type().clone(),
519            nullable: self.nullable,
520            metadata: self.metadata.clone(),
521        }
522    }
523
524    #[inline]
525    pub fn name(&self) -> &String {
526        &self.name
527    }
528
529    #[inline]
530    pub fn is_nullable(&self) -> bool {
531        self.nullable
532    }
533
534    #[inline]
535    pub const fn data_type(&self) -> &DataType {
536        &self.data_type
537    }
538
539    #[inline]
540    pub const fn metadata(&self) -> &HashMap<String, MetadataValue> {
541        &self.metadata
542    }
543
544    /// Convert our metadata into a HashMap<String, String>. Note this copies all the data so can be
545    /// expensive for large metadata
546    pub fn metadata_with_string_values(&self) -> HashMap<String, String> {
547        self.metadata
548            .iter()
549            .map(|(key, val)| (key.clone(), val.to_string()))
550            .collect()
551    }
552
553    /// Applies physical name and field ID mappings to this field.
554    ///
555    /// This function sets the field ID for the physical [`StructField`] only if the
556    /// `column_mapping_mode` is `Id`. The field ID is specified using the
557    /// [`ColumnMetadataKey::ParquetFieldId`] metadata field. Readers should use
558    /// [`ColumnMetadataKey::ParquetFieldId`] to match fields to the Parquet schema.
559    /// If a physical StructField contains a field ID, the reader must resolve columns
560    /// with that ID. Otherwise, the physical StructField's name is used. For details,
561    /// see [`read_parquet_files`].
562    ///
563    /// This function also sets the physical name of a field. If `column_mapping_mode` is
564    /// `Id` or `Name`, this is specified in [`ColumnMetadataKey::ColumnMappingPhysicalName`].
565    /// Otherwise, the field's logical name is used.
566    ///
567    /// Returns an error if a field has invalid or inconsistent column mapping annotations (e.g.
568    /// missing when column mapping is enabled, present when disabled, or wrong type), or if a
569    /// metadata column is encountered (metadata columns should not participate in column mapping).
570    ///
571    /// [`read_parquet_files`]: crate::ParquetHandler::read_parquet_files
572    #[internal_api]
573    pub(crate) fn make_physical(
574        &self,
575        column_mapping_mode: ColumnMappingMode,
576    ) -> DeltaResult<Self> {
577        MakePhysical::new(column_mapping_mode)
578            .transform_struct_field(self)
579            .map(|f| f.into_owned())
580    }
581
582    pub(crate) fn has_invariants(&self) -> bool {
583        self.metadata
584            .contains_key(ColumnMetadataKey::Invariants.as_ref())
585    }
586
587    /// Converts logical schema StructField metadata to physical schema metadata
588    /// based on the specified `column_mapping_mode`.
589    ///
590    /// NOTE: Must not be called on metadata columns, which are not subject to column mapping.
591    ///
592    /// NOTE: Caller affirms that `self` was already validated by
593    /// [`crate::table_features::validate_and_extract_column_mapping_annotations`], to ensure that
594    /// annotations are always and only present when column mapping mode is enabled.
595    fn logical_to_physical_metadata(
596        &self,
597        column_mapping_mode: ColumnMappingMode,
598    ) -> HashMap<String, MetadataValue> {
599        let mut base_metadata = self.metadata.clone();
600        let physical_name_key = ColumnMetadataKey::ColumnMappingPhysicalName.as_ref();
601        let field_id_key = ColumnMetadataKey::ColumnMappingId.as_ref();
602        let parquet_field_id_key = ColumnMetadataKey::ParquetFieldId.as_ref();
603        let field_id = base_metadata.get(ColumnMetadataKey::ColumnMappingId.as_ref());
604        match column_mapping_mode {
605            ColumnMappingMode::Id => {
606                let Some(MetadataValue::Number(fid)) = field_id else {
607                    // `validate_and_extract_column_mapping_annotations` should have verified that
608                    // this has a field Id
609                    warn!("StructField with name {} is missing field id in the Id column mapping mode", self.name());
610                    debug_assert!(false);
611                    return base_metadata;
612                };
613                // Insert the parquet field id matching the column mapping id
614                base_metadata.insert(
615                    parquet_field_id_key.to_string(),
616                    MetadataValue::Number(*fid),
617                );
618                // Ensure that physical name is present
619                debug_assert!(base_metadata.contains_key(physical_name_key));
620            }
621            ColumnMappingMode::Name => {
622                // Logical metadata should have the column mapping metadata keys
623                debug_assert!(base_metadata.contains_key(physical_name_key));
624                debug_assert!(base_metadata.contains_key(field_id_key));
625
626                // Retain column mapping id and insert parquet field id so that
627                // Parquet files carry field IDs in Name mode as well (matching
628                // the Delta protocol requirement and Delta Spark behaviour).
629                let Some(MetadataValue::Number(fid)) = field_id else {
630                    warn!("StructField with name {} is missing field id in the Name column mapping mode", self.name());
631                    debug_assert!(false);
632                    return base_metadata;
633                };
634                base_metadata.insert(
635                    parquet_field_id_key.to_string(),
636                    MetadataValue::Number(*fid),
637                );
638                // TODO(#1070): Remove nested column ids when they are supported in kernel
639            }
640            ColumnMappingMode::None => {
641                base_metadata.remove(physical_name_key);
642                base_metadata.remove(field_id_key);
643                base_metadata.remove(parquet_field_id_key);
644                // TODO(#1070): Remove nested column ids when they are supported in kernel
645            }
646        }
647        base_metadata
648    }
649}
650
651impl Display for StructField {
652    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
653        let mut metadata_str = String::from("{");
654        let mut first = true;
655        for (k, v) in self.metadata.iter() {
656            if !first {
657                metadata_str.push_str(", ");
658            }
659            first = false;
660            metadata_str.push_str(&format!("{k}: {v:?}"));
661        }
662        metadata_str.push('}');
663        write!(
664            f,
665            "{}: {} (is nullable: {}, metadata: {})",
666            self.name, self.data_type, self.nullable, metadata_str,
667        )
668    }
669}
670
671/// A struct is used to represent both the top-level schema of the table
672/// as well as struct columns that contain nested columns.
673#[derive(Debug, PartialEq, Clone, Eq)]
674pub struct StructType {
675    type_name: String,
676    /// The fields stored in this struct
677    // We use indexmap to preserve the order of fields as they are defined in the schema
678    // while also allowing for fast lookup by name. The alternative is to do a linear search
679    // for each field by name would be potentially quite expensive for large schemas.
680    fields: IndexMap<String, StructField>,
681    /// The metadata columns in this struct
682    // We use a dedicated map for metadata columns to allow for fast lookup without having to
683    // iterate over all fields.
684    metadata_columns: HashMap<MetadataColumnSpec, usize>,
685}
686
687pub struct StructTypeBuilder {
688    fields: IndexMap<String, StructField>,
689}
690
691impl Default for StructTypeBuilder {
692    fn default() -> Self {
693        Self::new()
694    }
695}
696
697impl StructTypeBuilder {
698    pub fn new() -> Self {
699        Self {
700            fields: IndexMap::new(),
701        }
702    }
703
704    pub fn from_schema(schema: &StructType) -> Self {
705        Self {
706            fields: schema.fields.clone(),
707        }
708    }
709
710    pub fn add_field(mut self, field: StructField) -> Self {
711        self.fields.insert(field.name.clone(), field);
712        self
713    }
714
715    pub fn build(self) -> DeltaResult<StructType> {
716        StructType::try_new(self.fields.into_values())
717    }
718
719    pub fn build_arc_unchecked(self) -> Arc<StructType> {
720        Arc::new(StructType::new_unchecked(self.fields.into_values()))
721    }
722}
723
724impl StructType {
725    /// Creates a new [`StructType`] from the given fields.
726    ///
727    /// Returns an error if:
728    /// - the schema contains duplicate field names (case-insensitive; Delta column names are
729    ///   case-insensitive per the protocol)
730    /// - the schema contains duplicate metadata columns
731    /// - the schema contains nested metadata columns
732    pub fn try_new(fields: impl IntoIterator<Item = StructField>) -> DeltaResult<Self> {
733        let mut field_map = IndexMap::new();
734        let mut metadata_columns = HashMap::new();
735        let mut seen_lowercase_names = HashSet::new();
736
737        // Validate each field during insertion
738        for (i, field) in fields.into_iter().enumerate() {
739            // Verify that there are no nested metadata columns
740            if !matches!(field.data_type, DataType::Primitive(_)) {
741                Self::ensure_no_metadata_columns_in_field(&field)?;
742            }
743
744            // Check for duplicate metadata columns
745            if let Some(metadata_column_spec) = field.get_metadata_column_spec() {
746                if metadata_columns.insert(metadata_column_spec, i).is_some() {
747                    return Err(Error::schema(format!(
748                        "Duplicate metadata column: {metadata_column_spec:?}",
749                    )));
750                }
751            }
752
753            // Delta column names are case-insensitive; reject schemas with duplicates that differ
754            // only by case.
755            let key = field.name.to_lowercase();
756            if !seen_lowercase_names.insert(key) {
757                return Err(Error::schema(format!(
758                    "Duplicate field name (case-insensitive): '{}'",
759                    field.name
760                )));
761            }
762
763            field_map.insert(field.name.clone(), field);
764        }
765
766        Ok(Self {
767            type_name: "struct".into(),
768            fields: field_map,
769            metadata_columns,
770        })
771    }
772
773    /// Creates a new [`StructType`] from a fallible iterator of fields.
774    ///
775    /// This constructor collects all fields from the iterator, returning the first error
776    /// encountered, or a new [`StructType`] if all fields are successfully collected and validated.
777    pub fn try_from_results<E: Into<Error>>(
778        fields: impl IntoIterator<Item = Result<StructField, E>>,
779    ) -> DeltaResult<Self> {
780        fields
781            .into_iter()
782            .map(|result| result.map_err(Into::into))
783            .process_results(|iter| Self::try_new(iter))?
784    }
785
786    pub fn builder() -> StructTypeBuilder {
787        StructTypeBuilder::new()
788    }
789
790    /// Creates a new [`StructType`] from the given fields without validating them.
791    ///
792    /// This should only be used when you are sure that the fields are valid.
793    /// Refer to [`StructType::try_new`] for more details on the validation checks.
794    #[internal_api]
795    pub(crate) fn new_unchecked(fields: impl IntoIterator<Item = StructField>) -> Self {
796        let mut field_map = IndexMap::new();
797        let mut metadata_columns = HashMap::new();
798
799        for (i, field) in fields.into_iter().enumerate() {
800            if let Some(metadata_column_spec) = field.get_metadata_column_spec() {
801                metadata_columns.insert(metadata_column_spec, i);
802            }
803            field_map.insert(field.name.clone(), field);
804        }
805
806        Self {
807            type_name: "struct".into(),
808            fields: field_map,
809            metadata_columns,
810        }
811    }
812
813    /// Gets a [`StructType`] containing [`StructField`]s of the given names. The order of fields in
814    /// the returned schema will match the order passed to this function, which can be different
815    /// from this order in this schema. Returns an Err if a specified field doesn't exist.
816    pub fn project_as_struct(&self, names: &[impl AsRef<str>]) -> DeltaResult<StructType> {
817        let fields = names.iter().map(|name| {
818            self.fields
819                .get(name.as_ref())
820                .cloned()
821                .ok_or_else(|| Error::missing_column(name.as_ref()))
822        });
823        Self::try_from_results(fields)
824    }
825
826    /// Gets a [`SchemaRef`] containing [`StructField`]s of the given names. The order of fields in
827    /// the returned schema will match the order passed to this function, which can be different
828    /// from this order in this schema. Returns an Err if a specified field doesn't exist.
829    pub fn project(&self, names: &[impl AsRef<str>]) -> DeltaResult<SchemaRef> {
830        let struct_type = self.project_as_struct(names)?;
831        Ok(Arc::new(struct_type))
832    }
833
834    /// Adds fields to this [`StructType`], returning a new [`StructType`].
835    pub fn add(&self, fields: impl IntoIterator<Item = StructField>) -> DeltaResult<Self> {
836        Self::try_new(self.fields.values().cloned().chain(fields))
837    }
838
839    /// Adds a predefined metadata column to this [`StructType`], returning a new [`StructType`].
840    pub fn add_metadata_column(
841        &self,
842        name: impl Into<String>,
843        spec: MetadataColumnSpec,
844    ) -> DeltaResult<Self> {
845        self.add([StructField::create_metadata_column(name, spec)])
846    }
847
848    /// Returns the index of the field with the given name, or None if not found.
849    pub fn index_of(&self, name: impl AsRef<str>) -> Option<usize> {
850        self.fields.get_index_of(name.as_ref())
851    }
852
853    /// Returns the index of the metadata column with the given spec, or None if not found.
854    pub fn index_of_metadata_column(&self, spec: &MetadataColumnSpec) -> Option<&usize> {
855        self.metadata_columns.get(spec)
856    }
857
858    /// Checks if the [`StructType`] contains a field with the specified name.
859    pub fn contains(&self, name: impl AsRef<str>) -> bool {
860        self.fields.contains_key(name.as_ref())
861    }
862
863    /// Checks if the [`StructType`] contains a metadata column with the given spec.
864    pub fn contains_metadata_column(&self, spec: &MetadataColumnSpec) -> bool {
865        self.metadata_columns.contains_key(spec)
866    }
867
868    /// Gets the field with the given name.
869    pub fn field(&self, name: impl AsRef<str>) -> Option<&StructField> {
870        self.fields.get(name.as_ref())
871    }
872
873    /// Resolves a column path through nested structs, returning references to all
874    /// [`StructField`]s along the path. The last element is the leaf field.
875    ///
876    /// Each element of the path must resolve to a field in the current struct. All intermediate
877    /// (non-leaf) fields must be struct types.
878    ///
879    /// Returns an error if the path is empty, a field is not found, or an intermediate
880    /// field is not a struct type.
881    #[internal_api]
882    pub(crate) fn walk_column_fields<'a>(
883        &'a self,
884        col: &ColumnName,
885    ) -> DeltaResult<Vec<&'a StructField>> {
886        self.walk_column_fields_by(col, |s, name| s.field(name))
887    }
888
889    /// Helper to walk through nested columns. For each path component in `col`, calls
890    ///
891    /// `find_field(current_struct, component)` to locate the matching field, then descends
892    ///
893    /// into the next nested struct. Returns references to all [`StructField`]s along the path.
894    pub(crate) fn walk_column_fields_by<'a, F>(
895        &'a self,
896        col: &ColumnName,
897        find_field: F,
898    ) -> DeltaResult<Vec<&'a StructField>>
899    where
900        F: for<'b> Fn(&'b StructType, &str) -> Option<&'b StructField>,
901    {
902        let path = col.path();
903        if path.is_empty() {
904            return Err(Error::generic("Column path cannot be empty"));
905        }
906        let mut current_struct = self;
907        let mut fields = Vec::with_capacity(path.len());
908        for (i, field_name) in path.iter().enumerate() {
909            let field = find_field(current_struct, field_name).ok_or_else(|| {
910                Error::generic(format!(
911                    "Could not resolve column '{col}': field '{field_name}' not found in schema"
912                ))
913            })?;
914            fields.push(field);
915            if i < path.len() - 1 {
916                let DataType::Struct(inner) = field.data_type() else {
917                    return Err(Error::generic(format!(
918                        "Cannot resolve column '{col}': intermediate field '{field_name}' \
919                         is not a struct type"
920                    )));
921                };
922                current_struct = inner;
923            }
924        }
925        Ok(fields)
926    }
927
928    /// Gets the field with the given name and its index.
929    pub fn field_with_index(&self, name: impl AsRef<str>) -> Option<(usize, &StructField)> {
930        self.fields
931            .get_full(name.as_ref())
932            .map(|(index, _, field)| (index, field))
933    }
934
935    /// Gets the field at the given index.
936    pub fn field_at_index(&self, index: usize) -> Option<&StructField> {
937        self.fields.get_index(index).map(|(_, field)| field)
938    }
939
940    /// Gets a reference to all the fields in this struct type.
941    pub fn fields(
942        &self,
943    ) -> impl ExactSizeIterator<Item = &StructField> + DoubleEndedIterator + FusedIterator {
944        self.fields.values()
945    }
946
947    /// Gets an iterator over all the fields in this struct type.
948    pub fn into_fields(
949        self,
950    ) -> impl ExactSizeIterator<Item = StructField> + DoubleEndedIterator + FusedIterator {
951        self.fields.into_values()
952    }
953
954    /// Gets a mutable reference to the underlying field map.
955    pub(crate) fn field_map_mut(&mut self) -> &mut IndexMap<String, StructField> {
956        &mut self.fields
957    }
958
959    /// Walk a pre-segmented column path through this schema and return the leaf field.
960    ///
961    /// `path` is the path's individual name segments (one per nesting level), already split by
962    /// the caller. Lookup at each level is case-insensitive. The Delta protocol uses `.` as the
963    /// dotted-path separator at the API surface, but `field_at_path` itself does not split --
964    /// callers typically pass [`ColumnName::path()`](crate::expressions::ColumnName::path),
965    /// which yields the segments directly.
966    ///
967    /// Panics if any segment is missing or an intermediate field is not a struct. Intended for
968    /// use in test assertions.
969    ///
970    /// # Example
971    ///
972    /// ```ignore
973    /// // Schema:
974    /// //   id:      INTEGER  not null
975    /// //   address: STRUCT { city: STRING not null, zip: STRING }
976    /// let path = vec!["address".to_string(), "city".to_string()];
977    /// let city = schema.field_at_path(&path);
978    /// assert_eq!(city.name(), "city");
979    /// assert!(!city.is_nullable());
980    /// ```
981    #[cfg(any(test, feature = "test-utils"))]
982    #[allow(clippy::panic, clippy::expect_used)]
983    pub fn field_at_path<'a>(&'a self, path: &[String]) -> &'a StructField {
984        fn find_ci<'a>(
985            mut fields: impl Iterator<Item = &'a StructField>,
986            name: &str,
987        ) -> &'a StructField {
988            let lowered = name.to_lowercase();
989            fields
990                .find(|f| f.name().to_lowercase() == lowered)
991                .unwrap_or_else(|| panic!("field '{name}' not found"))
992        }
993        let (first, rest) = path.split_first().expect("non-empty path");
994        let mut field = find_ci(self.fields(), first);
995        for seg in rest {
996            let DataType::Struct(s) = field.data_type() else {
997                panic!("expected struct at intermediate segment '{seg}'");
998            };
999            field = find_ci(s.fields(), seg);
1000        }
1001        field
1002    }
1003
1004    /// Gets all the field names in this struct type in the order they are defined.
1005    pub fn field_names(&self) -> impl ExactSizeIterator<Item = &String> {
1006        self.fields.keys()
1007    }
1008
1009    /// Gets the number of fields in this struct type.
1010    pub fn num_fields(&self) -> usize {
1011        // O(1) for indexmap
1012        self.fields.len()
1013    }
1014
1015    /// Recursively counts all [`StructField`] nodes in this schema tree.
1016    ///
1017    /// This includes nested struct fields (inside Struct, Array, and Map types) but does not
1018    /// count Array/Map containers themselves. This matches the traversal pattern used by
1019    /// `assign_column_mapping_metadata` when assigning column IDs, so the result equals the
1020    /// expected `delta.columnMapping.maxColumnId` for a newly created table.
1021    #[allow(unused)] // Only used by integration tests (create_table/column_mapping.rs)
1022    #[internal_api]
1023    pub(crate) fn total_struct_fields(&self) -> usize {
1024        fn count_data_type(dt: &DataType) -> usize {
1025            match dt {
1026                DataType::Struct(inner) => inner.total_struct_fields(),
1027                DataType::Array(array) => count_data_type(array.element_type()),
1028                DataType::Map(map) => {
1029                    count_data_type(map.key_type()) + count_data_type(map.value_type())
1030                }
1031                _ => 0,
1032            }
1033        }
1034        self.fields()
1035            .map(|field| 1 + count_data_type(field.data_type()))
1036            .sum()
1037    }
1038
1039    /// Gets a reference to the metadata column with the given spec.
1040    pub fn metadata_column(&self, spec: &MetadataColumnSpec) -> Option<&StructField> {
1041        self.metadata_columns
1042            .get(spec)
1043            .and_then(|index| self.fields.get_index(*index).map(|(_, field)| field))
1044    }
1045
1046    /// Gets an iterator over all the metadata columns in this struct type.
1047    pub fn metadata_columns(&self) -> impl Iterator<Item = &StructField> {
1048        self.metadata_columns
1049            .values()
1050            .filter_map(|index| self.fields.get_index(*index).map(|(_, field)| field))
1051    }
1052
1053    /// Extracts the name and type of all leaf columns, in schema order. Caller should pass Some
1054    /// `own_name` if this schema is embedded in a larger struct (e.g. `add.*`) and None if the
1055    /// schema is a top-level result (e.g. `*`).
1056    ///
1057    /// NOTE: This method only traverses through `StructType` fields; `MapType` and `ArrayType`
1058    /// fields are considered leaves even if they contain `StructType` entries/elements.
1059    #[allow(unused)]
1060    #[internal_api]
1061    pub(crate) fn leaves<'s>(&self, own_name: impl Into<Option<&'s str>>) -> ColumnNamesAndTypes {
1062        let mut get_leaves = GetSchemaLeaves::new(own_name.into());
1063        get_leaves.transform_struct(self);
1064        (get_leaves.names, get_leaves.types).into()
1065    }
1066
1067    /// Applies physical name mappings to this field. If the `column_mapping_mode` is
1068    /// [`ColumnMappingMode::Id`], then each StructField will have its parquet field id in the
1069    /// [`ColumnMetadataKey::ParquetFieldId`] metadata field.
1070    ///
1071    /// Uses a single transformer so duplicate column mapping IDs are detected across all
1072    /// fields in this struct, not just within each field's subtree.
1073    #[internal_api]
1074    pub(crate) fn make_physical(
1075        &self,
1076        column_mapping_mode: ColumnMappingMode,
1077    ) -> DeltaResult<Self> {
1078        let mut transformer = MakePhysical::new(column_mapping_mode);
1079        transformer.transform_struct(self).map(|s| s.into_owned())
1080    }
1081
1082    /// Validates that there are no metadata columns in the given fields.
1083    pub(crate) fn ensure_no_metadata_columns(
1084        fields: &mut dyn Iterator<Item = &StructField>,
1085    ) -> DeltaResult<()> {
1086        for field in fields {
1087            Self::ensure_no_metadata_columns_in_field(field)?;
1088        }
1089        Ok(())
1090    }
1091
1092    /// Validates that there are no metadata columns in the given field.
1093    pub(crate) fn ensure_no_metadata_columns_in_field(field: &StructField) -> DeltaResult<()> {
1094        if field.is_metadata_column() {
1095            return Err(Error::schema(
1096                "Metadata columns are only allowed at the top level of a schema".to_string(),
1097            ));
1098        }
1099
1100        match &field.data_type {
1101            DataType::Struct(struct_type) => {
1102                // Only check leaf fields; nested structs were validated at their creation
1103                Self::ensure_no_metadata_columns(&mut struct_type.fields().filter(|f| {
1104                    !matches!(f.data_type, DataType::Struct(_) | DataType::Variant(_))
1105                }))?;
1106            }
1107            DataType::Array(array_type) => {
1108                if let DataType::Struct(struct_type) = array_type.element_type() {
1109                    Self::ensure_no_metadata_columns(&mut struct_type.fields())?;
1110                }
1111            }
1112            DataType::Map(map_type) => {
1113                if let DataType::Struct(struct_type) = map_type.key_type() {
1114                    Self::ensure_no_metadata_columns(&mut struct_type.fields())?;
1115                }
1116                if let DataType::Struct(struct_type) = map_type.value_type() {
1117                    Self::ensure_no_metadata_columns(&mut struct_type.fields())?;
1118                }
1119            }
1120            // Primitive types cannot contain nested metadata columns and variant types are
1121            // validated at creation
1122            DataType::Primitive(_) | DataType::Variant(_) => {}
1123        };
1124
1125        Ok(())
1126    }
1127
1128    /// Returns a StructType with `new_field` inserted after the field named `after`.
1129    /// If `new_field`  already presents in the schema, an error is returned.
1130    /// If `after` is None, `new_field` is appended to the end.
1131    /// If `after` is not found, an error is returned.
1132    pub fn with_field_inserted_after(
1133        mut self,
1134        after: Option<&str>,
1135        new_field: StructField,
1136    ) -> DeltaResult<Self> {
1137        // TODO: Upgrade to a case-insensitive duplicate check when this method is used for
1138        // user-facing operations like ALTER TABLE ADD COLUMN. Currently only used internally
1139        // for inserting protocol-defined fields (e.g. stats_parsed) where exact-name matching
1140        // is sufficient.
1141        if self.fields.contains_key(&new_field.name) {
1142            return Err(Error::generic(format!(
1143                "Field {} already exists",
1144                new_field.name
1145            )));
1146        }
1147
1148        let insert_index = after
1149            .map(|after| {
1150                self.fields
1151                    .get_index_of(after)
1152                    .map(|index| index + 1)
1153                    .ok_or_else(|| Error::generic(format!("Field {after} not found")))
1154            })
1155            .unwrap_or_else(|| Ok(self.fields.len()))?;
1156
1157        self.fields
1158            .insert_before(insert_index, new_field.name.clone(), new_field);
1159        Ok(self)
1160    }
1161
1162    /// Returns a StructType with `new_field` inserted before the field named `before`.
1163    /// If `new_field` already presents in the schema, an error is returned.
1164    /// If `before` is None, `new_field` is inserted at the beginning.
1165    /// If `before` is not found, an error is returned.
1166    pub fn with_field_inserted_before(
1167        mut self,
1168        before: Option<&str>,
1169        new_field: StructField,
1170    ) -> DeltaResult<Self> {
1171        // TODO: Upgrade to a case-insensitive duplicate check when this method is used for
1172        // user-facing operations like ALTER TABLE ADD COLUMN. Currently only used internally
1173        // for inserting protocol-defined fields where exact-name matching is sufficient.
1174        if self.fields.contains_key(&new_field.name) {
1175            return Err(Error::generic(format!(
1176                "Field {} already exists",
1177                new_field.name
1178            )));
1179        }
1180
1181        let index_of_before = before
1182            .map(|before| {
1183                self.fields
1184                    .get_index_of(before)
1185                    .ok_or_else(|| Error::generic(format!("Field {before} not found")))
1186            })
1187            .unwrap_or_else(|| Ok(0))?;
1188
1189        self.fields
1190            .insert_before(index_of_before, new_field.name.clone(), new_field);
1191        Ok(self)
1192    }
1193
1194    /// Returns a StructType with the named field removed.
1195    /// Returns self unchanged if field doesn't exist.
1196    pub fn with_field_removed(mut self, name: &str) -> Self {
1197        self.fields.shift_remove(name);
1198        self
1199    }
1200
1201    /// Returns a new [`StructType`] containing only the top-level fields for which `predicate`
1202    /// returns `true`. This does not recurse into nested [`StructType`] fields.
1203    pub fn with_fields_filtered(
1204        &self,
1205        predicate: impl Fn(&StructField) -> bool,
1206    ) -> DeltaResult<Self> {
1207        Self::try_new(self.fields().filter(|f| predicate(f)).cloned())
1208    }
1209
1210    /// Returns an optional [`StructType`] containing only the top-level fields for which
1211    /// `predicate` returns `true`.
1212    ///
1213    /// This is a convenience wrapper around [`StructType::with_fields_filtered`] for callers
1214    /// that treat an empty top-level struct as "no schema".
1215    pub fn with_fields_filtered_nonempty(
1216        &self,
1217        predicate: impl Fn(&StructField) -> bool,
1218    ) -> DeltaResult<Option<Self>> {
1219        let filtered = self.with_fields_filtered(predicate)?;
1220        if filtered.num_fields() == 0 {
1221            Ok(None)
1222        } else {
1223            Ok(Some(filtered))
1224        }
1225    }
1226
1227    /// Returns a StructType with the named field replaced.
1228    /// Returns an error if field doesn't exist.
1229    pub fn with_field_replaced(
1230        mut self,
1231        name: &str,
1232        new_field: StructField,
1233    ) -> DeltaResult<StructType> {
1234        let replace_field = self
1235            .fields
1236            .get_mut(name)
1237            .ok_or_else(|| Error::generic(format!("Field {name} not found")))?;
1238
1239        *replace_field = new_field;
1240        Ok(self)
1241    }
1242}
1243
1244fn write_indent(f: &mut Formatter<'_>, levels: &[bool]) -> std::fmt::Result {
1245    let mut it = levels.iter().peekable();
1246
1247    while let Some(is_last) = it.next() {
1248        // Final level → draw branch
1249        if it.peek().is_none() {
1250            write!(f, "{}", if *is_last { "└─" } else { "├─" })?;
1251        }
1252        // Parent levels → vertical line or empty space
1253        else {
1254            write!(f, "{}", if *is_last { "   " } else { "│  " })?;
1255        }
1256    }
1257
1258    Ok(())
1259}
1260
1261fn write_struct_type(
1262    st: &StructType,
1263    f: &mut Formatter<'_>,
1264    levels: &mut Vec<bool>,
1265) -> std::fmt::Result {
1266    let len = st.fields.len();
1267
1268    for (i, (_, field)) in st.fields.iter().enumerate() {
1269        let is_last = i + 1 == len;
1270        levels.push(is_last);
1271
1272        write_indent(f, levels)?;
1273        writeln!(f, "{field}")?;
1274
1275        field.data_type.fmt_recursive(f, levels)?;
1276
1277        levels.pop();
1278    }
1279    Ok(())
1280}
1281
1282impl Display for StructType {
1283    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1284        writeln!(f, "{}:", self.type_name)?;
1285        let mut levels = Vec::new();
1286        write_struct_type(self, f, &mut levels)
1287    }
1288}
1289
1290impl IntoIterator for StructType {
1291    type Item = StructField;
1292    type IntoIter = StructFieldIntoIter;
1293
1294    fn into_iter(self) -> Self::IntoIter {
1295        StructFieldIntoIter {
1296            inner: self.fields.into_values(),
1297        }
1298    }
1299}
1300
1301impl<'a> IntoIterator for &'a StructType {
1302    type Item = &'a StructField;
1303    type IntoIter = StructFieldRefIter<'a>;
1304
1305    fn into_iter(self) -> Self::IntoIter {
1306        StructFieldRefIter {
1307            inner: self.fields.values(),
1308        }
1309    }
1310}
1311
1312/// An iterator that yields owned [`StructField`]s from a [`StructType`].
1313///
1314/// This iterator is returned by the [`IntoIterator`] implementation for [`StructType`] and
1315/// consumes the original struct. It yields each field in the order they were defined in the
1316/// schema, preserving the insertion order maintained by the underlying [`IndexMap`].
1317///
1318/// # Examples
1319///
1320/// ```
1321/// # use buoyant_kernel as delta_kernel;
1322/// # use delta_kernel::Error;
1323/// use delta_kernel::schema::{StructType, StructField, DataType};
1324///
1325/// let fields = vec![
1326///     StructField::new("name", DataType::STRING, false),
1327///     StructField::new("age", DataType::INTEGER, true),
1328/// ];
1329/// let struct_type = StructType::try_new(fields)?;
1330///
1331/// // Consume the struct_type and iterate over owned fields
1332/// for field in struct_type {
1333///     println!("Field: {} ({})", field.name(), field.data_type());
1334/// }
1335/// # Ok::<(), Error>(())
1336/// ```
1337///
1338/// [`IndexMap`]: indexmap::IndexMap
1339#[derive(Debug)]
1340pub struct StructFieldIntoIter {
1341    inner: indexmap::map::IntoValues<String, StructField>,
1342}
1343
1344impl Iterator for StructFieldIntoIter {
1345    type Item = StructField;
1346
1347    fn next(&mut self) -> Option<Self::Item> {
1348        self.inner.next()
1349    }
1350
1351    fn size_hint(&self) -> (usize, Option<usize>) {
1352        self.inner.size_hint()
1353    }
1354
1355    fn count(self) -> usize {
1356        self.inner.count()
1357    }
1358
1359    fn last(self) -> Option<Self::Item> {
1360        self.inner.last()
1361    }
1362
1363    fn nth(&mut self, n: usize) -> Option<Self::Item> {
1364        self.inner.nth(n)
1365    }
1366}
1367
1368impl ExactSizeIterator for StructFieldIntoIter {
1369    fn len(&self) -> usize {
1370        self.inner.len()
1371    }
1372}
1373
1374impl FusedIterator for StructFieldIntoIter {}
1375
1376impl DoubleEndedIterator for StructFieldIntoIter {
1377    fn next_back(&mut self) -> Option<Self::Item> {
1378        self.inner.next_back()
1379    }
1380}
1381
1382/// An iterator that yields references to [`StructField`]s from a [`StructType`].
1383///
1384/// This iterator is returned by the [`IntoIterator`] implementation for `&StructType` and by
1385/// the [`StructType::fields()`] method. Unlike [`StructFieldIntoIter`], this iterator does not
1386/// consume the original struct and yields references to the fields. It preserves the insertion
1387/// order maintained by the underlying [`IndexMap`].
1388///
1389/// This iterator implements [`Clone`], allowing you to create multiple independent iterators
1390/// over the same set of fields.
1391///
1392/// # Examples
1393///
1394/// ```
1395/// # use buoyant_kernel as delta_kernel;
1396/// # use delta_kernel::Error;
1397/// use delta_kernel::schema::{StructType, StructField, DataType};
1398///
1399/// let fields = vec![
1400///     StructField::new("name", DataType::STRING, false),
1401///     StructField::new("age", DataType::INTEGER, true),
1402/// ];
1403/// let struct_type = StructType::try_new(fields)?;
1404///
1405/// // Iterate over field references without consuming the struct_type
1406/// for field in &struct_type {
1407///     println!("Field: {} ({})", field.name(), field.data_type());
1408/// }
1409///
1410/// // struct_type is still available for use
1411/// assert_eq!(struct_type.field("name").unwrap().name(), "name");
1412///
1413/// // Or use the fields() method explicitly
1414/// for field in struct_type.fields() {
1415///     println!("Field type: {}", field.data_type());
1416/// }
1417/// # Ok::<(), Error>(())
1418/// ```
1419///
1420/// [`StructType::fields()`]: StructType::fields
1421/// [`IndexMap`]: indexmap::IndexMap
1422#[derive(Debug, Clone)]
1423pub struct StructFieldRefIter<'a> {
1424    inner: indexmap::map::Values<'a, String, StructField>,
1425}
1426
1427impl<'a> Iterator for StructFieldRefIter<'a> {
1428    type Item = &'a StructField;
1429
1430    fn next(&mut self) -> Option<Self::Item> {
1431        self.inner.next()
1432    }
1433
1434    fn size_hint(&self) -> (usize, Option<usize>) {
1435        self.inner.size_hint()
1436    }
1437}
1438
1439impl ExactSizeIterator for StructFieldRefIter<'_> {
1440    fn len(&self) -> usize {
1441        self.inner.len()
1442    }
1443}
1444
1445impl FusedIterator for StructFieldRefIter<'_> {}
1446
1447impl DoubleEndedIterator for StructFieldRefIter<'_> {
1448    fn next_back(&mut self) -> Option<Self::Item> {
1449        self.inner.next_back()
1450    }
1451}
1452
1453struct InvariantChecker;
1454
1455impl<'a> SchemaTransform<'a> for InvariantChecker {
1456    transform_output_type!(|'a, T| Result<(), ()>);
1457
1458    fn transform_struct_field(&mut self, field: &'a StructField) -> Result<(), ()> {
1459        if field.has_invariants() {
1460            Err(())
1461        } else {
1462            self.recurse_into_struct_field(field)
1463        }
1464    }
1465}
1466
1467/// Checks if any column in the schema (including nested columns) has invariants defined.
1468///
1469/// This traverses the entire schema to check for the presence of the `delta.invariants`
1470/// metadata key.
1471pub(crate) fn schema_has_invariants(schema: &Schema) -> bool {
1472    InvariantChecker.transform_struct(schema).is_err()
1473}
1474
1475/// Visitor that reports whether any non-null (`nullable: false`) field exists in a schema.
1476/// Walks the full schema tree including nested struct, array element, and map value structs.
1477struct NonNullFieldChecker;
1478
1479impl<'a> SchemaTransform<'a> for NonNullFieldChecker {
1480    transform_output_type!(|'a, T| Result<(), ()>);
1481
1482    /// Skip recursion into variant internals. The `metadata` and `value` fields inside a
1483    /// `Variant` are protocol-defined, always non-null, and not user-controlled, so they
1484    /// must not be treated as user-declared non-null columns.
1485    fn transform_variant(&mut self, _stype: &'a StructType) -> Result<(), ()> {
1486        Ok(())
1487    }
1488
1489    fn transform_struct_field(&mut self, field: &'a StructField) -> Result<(), ()> {
1490        if !field.is_nullable() {
1491            return Err(());
1492        }
1493
1494        self.recurse_into_struct_field(field)
1495    }
1496}
1497
1498/// Checks if any user-controlled column in the schema (including nested columns) is declared
1499/// non-null (`nullable: false`).
1500///
1501/// Skips `Variant` internal struct fields, which are protocol-defined and always non-null.
1502pub(crate) fn schema_contains_non_null_fields(schema: &Schema) -> bool {
1503    NonNullFieldChecker.transform_struct(schema).is_err()
1504}
1505
1506/// Normalizes column name field names to match the casing in the schema.
1507///
1508/// Walks each field name through the schema's struct hierarchy, replacing user-provided
1509/// casing with the schema's canonical casing. If a field name isn't found
1510/// case-insensitively, keeps the original (subsequent validation catches it).
1511///
1512/// For example, given schema `{ Id: int, Name: string }` and user-provided columns
1513/// `["id", "name"]`, returns `["Id", "Name"]` -- matching the schema's canonical casing.
1514///
1515/// Note: Must be called before validation (`validate_partition_columns` or
1516/// `validate_clustering_columns`) so that case-normalized names match the schema.
1517pub(crate) fn normalize_column_names_to_schema_casing(
1518    schema: &StructType,
1519    columns: &[ColumnName],
1520) -> Vec<ColumnName> {
1521    columns
1522        .iter()
1523        .map(|col| {
1524            let path = col.path();
1525            let mut normalized: Vec<String> = Vec::with_capacity(path.len());
1526            let mut current_schema = schema;
1527            for (i, field_name) in path.iter().enumerate() {
1528                match current_schema
1529                    .fields()
1530                    .find(|f| f.name().eq_ignore_ascii_case(field_name))
1531                {
1532                    Some(f) => {
1533                        normalized.push(f.name().to_string());
1534                        if let DataType::Struct(inner) = f.data_type() {
1535                            current_schema = inner;
1536                        }
1537                    }
1538                    None => {
1539                        // Field name not found at this level -- keep remaining path
1540                        // unchanged so validation reports the user's original input.
1541                        normalized.extend(path[i..].iter().cloned());
1542                        break;
1543                    }
1544                }
1545            }
1546            ColumnName::new(normalized.iter().map(|s| s.as_str()))
1547        })
1548        .collect()
1549}
1550
1551/// Helper for RowVisitor implementations
1552#[internal_api]
1553#[derive(Clone, Default)]
1554pub(crate) struct ColumnNamesAndTypes(Vec<ColumnName>, Vec<DataType>);
1555impl ColumnNamesAndTypes {
1556    #[internal_api]
1557    pub(crate) fn as_ref(&self) -> (&[ColumnName], &[DataType]) {
1558        (&self.0, &self.1)
1559    }
1560}
1561
1562impl From<(Vec<ColumnName>, Vec<DataType>)> for ColumnNamesAndTypes {
1563    fn from((names, fields): (Vec<ColumnName>, Vec<DataType>)) -> Self {
1564        ColumnNamesAndTypes(names, fields)
1565    }
1566}
1567
1568#[derive(Debug, Deserialize, Serialize)]
1569#[serde(rename_all = "camelCase")]
1570struct StructTypeSerDeHelper {
1571    #[serde(rename = "type")]
1572    type_name: String,
1573    fields: Vec<StructField>,
1574}
1575
1576impl Serialize for StructType {
1577    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1578    where
1579        S: serde::Serializer,
1580    {
1581        StructTypeSerDeHelper {
1582            type_name: self.type_name.clone(),
1583            fields: self.fields.values().cloned().collect(),
1584        }
1585        .serialize(serializer)
1586    }
1587}
1588
1589impl<'de> Deserialize<'de> for StructType {
1590    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1591    where
1592        D: serde::Deserializer<'de>,
1593        Self: Sized,
1594    {
1595        let helper = StructTypeSerDeHelper::deserialize(deserializer)?;
1596        StructType::try_new(helper.fields).map_err(serde::de::Error::custom)
1597    }
1598}
1599
1600#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Eq)]
1601#[serde(rename_all = "camelCase")]
1602pub struct ArrayType {
1603    #[serde(rename = "type")]
1604    pub type_name: String,
1605    /// The type of element stored in this array
1606    pub element_type: DataType,
1607    /// Denoting whether this array can contain one or more null values
1608    pub contains_null: bool,
1609}
1610
1611impl ArrayType {
1612    pub fn new(element_type: DataType, contains_null: bool) -> Self {
1613        Self {
1614            type_name: "array".into(),
1615            element_type,
1616            contains_null,
1617        }
1618    }
1619
1620    #[inline]
1621    pub const fn element_type(&self) -> &DataType {
1622        &self.element_type
1623    }
1624
1625    #[inline]
1626    pub const fn contains_null(&self) -> bool {
1627        self.contains_null
1628    }
1629}
1630
1631#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Eq)]
1632#[serde(rename_all = "camelCase")]
1633pub struct MapType {
1634    #[serde(rename = "type")]
1635    pub type_name: String,
1636    /// The type of element used for the key of this map
1637    pub key_type: DataType,
1638    /// The type of element used for the value of this map
1639    pub value_type: DataType,
1640    /// Denoting whether this map can contain one or more null values
1641    #[serde(default = "default_true")]
1642    pub value_contains_null: bool,
1643}
1644
1645impl MapType {
1646    pub fn new(
1647        key_type: impl Into<DataType>,
1648        value_type: impl Into<DataType>,
1649        value_contains_null: bool,
1650    ) -> Self {
1651        Self {
1652            type_name: "map".into(),
1653            key_type: key_type.into(),
1654            value_type: value_type.into(),
1655            value_contains_null,
1656        }
1657    }
1658
1659    #[inline]
1660    pub const fn key_type(&self) -> &DataType {
1661        &self.key_type
1662    }
1663
1664    #[inline]
1665    pub const fn value_type(&self) -> &DataType {
1666        &self.value_type
1667    }
1668
1669    #[inline]
1670    pub const fn value_contains_null(&self) -> bool {
1671        self.value_contains_null
1672    }
1673
1674    /// Create a schema assuming the map is stored as a struct with the specified key and value
1675    /// field names
1676    pub fn as_struct_schema(&self, key_name: String, val_name: String) -> Schema {
1677        StructType::new_unchecked([
1678            StructField::not_null(key_name, self.key_type.clone()),
1679            StructField::new(val_name, self.value_type.clone(), self.value_contains_null),
1680        ])
1681    }
1682}
1683
1684fn default_true() -> bool {
1685    true
1686}
1687
1688#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
1689pub struct DecimalType {
1690    precision: u8,
1691    scale: u8,
1692}
1693
1694impl DecimalType {
1695    /// Check if the given precision and scale are valid for a decimal type.
1696    pub fn try_new(precision: u8, scale: u8) -> DeltaResult<Self> {
1697        require!(
1698            0 < precision && precision <= 38,
1699            Error::invalid_decimal(format!(
1700                "precision must be in range 1..38 inclusive, found: {precision}."
1701            ))
1702        );
1703        require!(
1704            scale <= precision,
1705            Error::invalid_decimal(format!(
1706                "scale must be in range 0..{precision} inclusive, found: {scale}."
1707            ))
1708        );
1709        Ok(Self { precision, scale })
1710    }
1711
1712    pub fn precision(&self) -> u8 {
1713        self.precision
1714    }
1715
1716    pub fn scale(&self) -> u8 {
1717        self.scale
1718    }
1719}
1720
1721#[derive(Debug, Serialize, PartialEq, Clone, Eq)]
1722#[serde(rename_all = "camelCase")]
1723pub enum PrimitiveType {
1724    /// UTF-8 encoded string of characters
1725    String,
1726    /// i64: 8-byte signed integer. Range: -9223372036854775808 to 9223372036854775807
1727    Long,
1728    /// i32: 4-byte signed integer. Range: -2147483648 to 2147483647
1729    Integer,
1730    /// i16: 2-byte signed integer numbers. Range: -32768 to 32767
1731    Short,
1732    /// i8: 1-byte signed integer number. Range: -128 to 127
1733    Byte,
1734    /// f32: 4-byte single-precision floating-point numbers
1735    Float,
1736    /// f64: 8-byte double-precision floating-point numbers
1737    Double,
1738    /// bool: boolean values
1739    Boolean,
1740    Binary,
1741    Date,
1742    /// Microsecond precision timestamp, adjusted to UTC.
1743    Timestamp,
1744    #[serde(rename = "timestamp_ntz")]
1745    TimestampNtz,
1746    #[cfg(feature = "nanosecond-timestamps")]
1747    #[serde(rename = "timestamp_nanos")]
1748    TimestampNanos,
1749    #[serde(serialize_with = "serialize_decimal", untagged)]
1750    Decimal(DecimalType),
1751}
1752
1753impl PrimitiveType {
1754    pub fn decimal(precision: u8, scale: u8) -> DeltaResult<Self> {
1755        Ok(DecimalType::try_new(precision, scale)?.into())
1756    }
1757
1758    /// Returns `true` if this primitive type can be widened to the `target` type.
1759    ///
1760    /// Widening rules:
1761    /// - Integer widening: byte -> short -> int -> long (Delta protocol type widening)
1762    /// - Float widening: float -> double (Delta protocol type widening)
1763    /// - Timestamp interchangeability: Timestamp <-> TimestampNtz (both are i64 microseconds since
1764    ///   epoch, differing only in timezone semantics; this is a physical read accommodation, not a
1765    ///   Delta protocol type widening rule)
1766    #[internal_api]
1767    pub(crate) fn can_widen_to(&self, target: &Self) -> bool {
1768        use PrimitiveType::*;
1769        matches!(
1770            (self, target),
1771            // Integer widening: smaller types can be read as larger ones
1772            (Byte, Short | Integer | Long)
1773                | (Short, Integer | Long)
1774                | (Integer, Long)
1775                // Float widening: float can be read as double
1776                | (Float, Double)
1777                // Timestamp equivalence: both are i64 microseconds since epoch, differing only
1778                // in timezone semantics. The parquet representation is identical, so reading
1779                // one as the other is safe at the data layer.
1780                | (Timestamp, TimestampNtz)
1781                | (TimestampNtz, Timestamp)
1782        )
1783    }
1784
1785    /// Returns `true` if `self` is a physical integer type that some checkpoint writers
1786    /// produce when they omit Parquet logical type annotations for date or timestamp columns.
1787    ///
1788    /// Specifically:
1789    /// - Integer -> Date (int32 stored without DATE annotation)
1790    /// - Long -> Timestamp/TimestampNtz (int64 stored without TIMESTAMP annotation)
1791    ///
1792    /// These are **not** Delta protocol type widening rules and must not be used outside of
1793    /// checkpoint compatibility checks.
1794    ///
1795    /// NOTE: The Arrow-level equivalent lives in `check_cast_compat` in
1796    /// `engine/ensure_data_types.rs`. Changes here must be mirrored there.
1797    pub(crate) fn is_checkpoint_cast_compatible(&self, target: &Self) -> bool {
1798        matches!(
1799            (self, target),
1800            (Self::Integer, Self::Date) | (Self::Long, Self::Timestamp | Self::TimestampNtz)
1801        )
1802    }
1803
1804    /// Returns `true` if this primitive type is compatible with `target` for reading
1805    /// `stats_parsed` columns from checkpoint parquet files.
1806    ///
1807    /// This is a superset of [`can_widen_to`]: it includes all Delta protocol type widening
1808    /// rules plus physical Parquet encoding accommodations for checkpoint interop (see
1809    /// [`is_checkpoint_cast_compatible`]).
1810    ///
1811    /// [`can_widen_to`]: PrimitiveType::can_widen_to
1812    /// [`is_checkpoint_cast_compatible`]: PrimitiveType::is_checkpoint_cast_compatible
1813    pub(crate) fn is_stats_type_compatible_with(&self, target: &Self) -> bool {
1814        self == target || self.can_widen_to(target) || self.is_checkpoint_cast_compatible(target)
1815    }
1816}
1817
1818fn serialize_decimal<S: serde::Serializer>(
1819    dtype: &DecimalType,
1820    serializer: S,
1821) -> Result<S::Ok, S::Error> {
1822    serializer.serialize_str(&format!("decimal({},{})", dtype.precision(), dtype.scale()))
1823}
1824
1825fn serialize_variant<S: serde::Serializer>(
1826    _: &StructType,
1827    serializer: S,
1828) -> Result<S::Ok, S::Error> {
1829    serializer.serialize_str("variant")
1830}
1831
1832// Custom Deserialize to provide clear error messages for unsupported types.
1833// The derived impl would produce: "unknown variant `interval second`, expected one of ..."
1834// This impl produces: "Unsupported Delta table type: 'interval second'"
1835impl<'de> serde::Deserialize<'de> for PrimitiveType {
1836    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1837    where
1838        D: serde::Deserializer<'de>,
1839    {
1840        let str_value = String::deserialize(deserializer)?;
1841
1842        match str_value.as_str() {
1843            "string" => Ok(PrimitiveType::String),
1844            "long" => Ok(PrimitiveType::Long),
1845            "integer" => Ok(PrimitiveType::Integer),
1846            "short" => Ok(PrimitiveType::Short),
1847            "byte" => Ok(PrimitiveType::Byte),
1848            "float" => Ok(PrimitiveType::Float),
1849            "double" => Ok(PrimitiveType::Double),
1850            "boolean" => Ok(PrimitiveType::Boolean),
1851            "binary" => Ok(PrimitiveType::Binary),
1852            "date" => Ok(PrimitiveType::Date),
1853            "timestamp" => Ok(PrimitiveType::Timestamp),
1854            "timestamp_ntz" => Ok(PrimitiveType::TimestampNtz),
1855            #[cfg(feature = "nanosecond-timestamps")]
1856            "timestamp_nanos" => Ok(PrimitiveType::TimestampNanos),
1857            decimal_str if decimal_str.starts_with("decimal(") && decimal_str.ends_with(')') => {
1858                // Parse decimal type
1859                let mut parts = decimal_str[8..decimal_str.len() - 1].split(',');
1860                let precision = parts
1861                    .next()
1862                    .and_then(|part| part.trim().parse::<u8>().ok())
1863                    .ok_or_else(|| {
1864                        serde::de::Error::custom(format!(
1865                            "Invalid precision in decimal: {decimal_str}"
1866                        ))
1867                    })?;
1868                let scale = parts
1869                    .next()
1870                    .and_then(|part| part.trim().parse::<u8>().ok())
1871                    .ok_or_else(|| {
1872                        serde::de::Error::custom(format!("Invalid scale in decimal: {decimal_str}"))
1873                    })?;
1874                // Reject extra parts (e.g., decimal(10,2,99))
1875                if parts.next().is_some() {
1876                    return Err(serde::de::Error::custom(format!(
1877                        "Invalid decimal format (expected 2 parts): {decimal_str}"
1878                    )));
1879                }
1880                DecimalType::try_new(precision, scale)
1881                    .map(PrimitiveType::Decimal)
1882                    .map_err(serde::de::Error::custom)
1883            }
1884            unsupported => Err(serde::de::Error::custom(format!(
1885                "Unsupported Delta table type: '{unsupported}'"
1886            ))),
1887        }
1888    }
1889}
1890
1891impl Display for PrimitiveType {
1892    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1893        match self {
1894            PrimitiveType::String => write!(f, "string"),
1895            PrimitiveType::Long => write!(f, "long"),
1896            PrimitiveType::Integer => write!(f, "integer"),
1897            PrimitiveType::Short => write!(f, "short"),
1898            PrimitiveType::Byte => write!(f, "byte"),
1899            PrimitiveType::Float => write!(f, "float"),
1900            PrimitiveType::Double => write!(f, "double"),
1901            PrimitiveType::Boolean => write!(f, "boolean"),
1902            PrimitiveType::Binary => write!(f, "binary"),
1903            PrimitiveType::Date => write!(f, "date"),
1904            PrimitiveType::Timestamp => write!(f, "timestamp"),
1905            PrimitiveType::TimestampNtz => write!(f, "timestamp_ntz"),
1906            #[cfg(feature = "nanosecond-timestamps")]
1907            PrimitiveType::TimestampNanos => write!(f, "timestamp_nanos"),
1908            PrimitiveType::Decimal(dtype) => {
1909                write!(f, "decimal({},{})", dtype.precision(), dtype.scale())
1910            }
1911        }
1912    }
1913}
1914
1915#[derive(Debug, Serialize, PartialEq, Clone, Eq)]
1916#[serde(untagged, rename_all = "camelCase")]
1917pub enum DataType {
1918    /// UTF-8 encoded string of characters
1919    Primitive(PrimitiveType),
1920    /// An array stores a variable length collection of items of some type.
1921    Array(Box<ArrayType>),
1922    /// A struct is used to represent both the top-level schema of the table as well
1923    /// as struct columns that contain nested columns.
1924    Struct(Box<StructType>),
1925    /// A map stores an arbitrary length collection of key-value pairs
1926    /// with a single keyType and a single valueType
1927    Map(Box<MapType>),
1928    /// The Variant data type. The physical representation can be flexible to support shredded
1929    /// reads. The unshredded schema is `Variant(StructType<metadata: BINARY, value: BINARY>)`.
1930    #[serde(serialize_with = "serialize_variant")]
1931    Variant(Box<StructType>),
1932}
1933
1934impl From<DecimalType> for PrimitiveType {
1935    fn from(dtype: DecimalType) -> Self {
1936        PrimitiveType::Decimal(dtype)
1937    }
1938}
1939impl From<DecimalType> for DataType {
1940    fn from(dtype: DecimalType) -> Self {
1941        PrimitiveType::from(dtype).into()
1942    }
1943}
1944impl From<PrimitiveType> for DataType {
1945    fn from(ptype: PrimitiveType) -> Self {
1946        DataType::Primitive(ptype)
1947    }
1948}
1949impl From<MapType> for DataType {
1950    fn from(map_type: MapType) -> Self {
1951        DataType::Map(Box::new(map_type))
1952    }
1953}
1954
1955impl From<StructType> for DataType {
1956    fn from(struct_type: StructType) -> Self {
1957        DataType::Struct(Box::new(struct_type))
1958    }
1959}
1960
1961impl From<ArrayType> for DataType {
1962    fn from(array_type: ArrayType) -> Self {
1963        DataType::Array(Box::new(array_type))
1964    }
1965}
1966
1967impl From<SchemaRef> for DataType {
1968    fn from(schema: SchemaRef) -> Self {
1969        Arc::unwrap_or_clone(schema).into()
1970    }
1971}
1972
1973// Custom Deserialize to preserve error messages from PrimitiveType.
1974// Serde's untagged enum only reports the last variant's error, discarding PrimitiveType's
1975// clear "Unsupported Delta table type: 'X'" message. We deserialize to Value first, then
1976// dispatch based on structure (string -> Primitive/Variant, object -> Array/Struct/Map).
1977impl<'de> serde::Deserialize<'de> for DataType {
1978    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1979    where
1980        D: serde::Deserializer<'de>,
1981    {
1982        use serde::de::Error;
1983        use serde_json::Value;
1984
1985        let value = Value::deserialize(deserializer)?;
1986
1987        // String values are either primitive types or "variant"
1988        if let Value::String(s) = &value {
1989            if s == "variant" {
1990                return match DataType::unshredded_variant() {
1991                    DataType::Variant(st) => Ok(DataType::Variant(st)),
1992                    _ => Err(Error::custom("Failed to create variant type")),
1993                };
1994            }
1995
1996            // Try PrimitiveType - this will give us good error messages for unsupported types
1997            return PrimitiveType::deserialize(value.clone())
1998                .map(DataType::Primitive)
1999                .map_err(|e| Error::custom(e.to_string()));
2000        }
2001
2002        // Object values are complex types - dispatch based on "type" field
2003        if let Value::Object(map) = &value {
2004            if let Some(Value::String(type_str)) = map.get("type") {
2005                return match type_str.as_str() {
2006                    "array" => ArrayType::deserialize(value)
2007                        .map(|at| DataType::Array(Box::new(at)))
2008                        .map_err(|e| Error::custom(e.to_string())),
2009                    "struct" => StructType::deserialize(value)
2010                        .map(|st| DataType::Struct(Box::new(st)))
2011                        .map_err(|e| Error::custom(e.to_string())),
2012                    "map" => MapType::deserialize(value)
2013                        .map(|mt| DataType::Map(Box::new(mt)))
2014                        .map_err(|e| Error::custom(e.to_string())),
2015                    _ => Err(Error::custom(format!("Unknown complex type: '{type_str}'"))),
2016                };
2017            }
2018        }
2019
2020        // Fallback error with the actual value that failed
2021        Err(Error::custom(format!(
2022            "Invalid data type: {}",
2023            serde_json::to_string(&value).unwrap_or_else(|_| format!("{value:?}"))
2024        )))
2025    }
2026}
2027
2028/// cbindgen:ignore
2029impl DataType {
2030    pub const STRING: Self = DataType::Primitive(PrimitiveType::String);
2031    pub const LONG: Self = DataType::Primitive(PrimitiveType::Long);
2032    pub const INTEGER: Self = DataType::Primitive(PrimitiveType::Integer);
2033    pub const SHORT: Self = DataType::Primitive(PrimitiveType::Short);
2034    pub const BYTE: Self = DataType::Primitive(PrimitiveType::Byte);
2035    pub const FLOAT: Self = DataType::Primitive(PrimitiveType::Float);
2036    pub const DOUBLE: Self = DataType::Primitive(PrimitiveType::Double);
2037    pub const BOOLEAN: Self = DataType::Primitive(PrimitiveType::Boolean);
2038    pub const BINARY: Self = DataType::Primitive(PrimitiveType::Binary);
2039    pub const DATE: Self = DataType::Primitive(PrimitiveType::Date);
2040    pub const TIMESTAMP: Self = DataType::Primitive(PrimitiveType::Timestamp);
2041    pub const TIMESTAMP_NTZ: Self = DataType::Primitive(PrimitiveType::TimestampNtz);
2042    #[cfg(feature = "nanosecond-timestamps")]
2043    pub const TIMESTAMP_NANOS: Self = DataType::Primitive(PrimitiveType::TimestampNanos);
2044    /// Create a new decimal type with the given precision and scale.
2045    pub fn decimal(precision: u8, scale: u8) -> DeltaResult<Self> {
2046        Ok(PrimitiveType::decimal(precision, scale)?.into())
2047    }
2048
2049    /// Create a new struct type with the given fields.
2050    pub fn try_struct_type(fields: impl IntoIterator<Item = StructField>) -> DeltaResult<Self> {
2051        Ok(StructType::try_new(fields)?.into())
2052    }
2053
2054    /// Create a new struct type from a fallible iterator of fields.
2055    pub fn try_struct_type_from_results<E: Into<Error>>(
2056        fields: impl IntoIterator<Item = Result<StructField, E>>,
2057    ) -> DeltaResult<Self> {
2058        StructType::try_from_results(fields).map(Self::from)
2059    }
2060
2061    /// Create a new struct type with the given fields without validating them.
2062    pub(crate) fn struct_type_unchecked(fields: impl IntoIterator<Item = StructField>) -> Self {
2063        StructType::new_unchecked(fields).into()
2064    }
2065
2066    /// Create a new unshredded [`DataType::Variant`]. This data type is a struct of two not-null
2067    /// binary fields: `metadata` and `value`.
2068    pub fn unshredded_variant() -> Self {
2069        DataType::Variant(Box::new(StructType::new_unchecked([
2070            StructField::not_null("metadata", DataType::BINARY),
2071            StructField::not_null("value", DataType::BINARY),
2072        ])))
2073    }
2074
2075    /// Create a new [`DataType::Variant`] from the provided fields. For unshredded variants, you
2076    /// should prefer using [`DataType::unshredded_variant`].
2077    pub fn variant_type(fields: impl IntoIterator<Item = StructField>) -> DeltaResult<Self> {
2078        // Different from regular StructTypes, Variants are not allowed to contain metadata columns
2079        // at all, so we also need to check their top-level primitive types.
2080        Ok(DataType::Variant(Box::new(StructType::try_from_results(
2081            fields.into_iter().map(|field| {
2082                if field.is_metadata_column() {
2083                    Err(Error::schema(
2084                        "Metadata columns are not allowed in Variant types".to_string(),
2085                    ))
2086                } else {
2087                    Ok(field)
2088                }
2089            }),
2090        )?)))
2091    }
2092
2093    /// Attempt to convert this data type to a [`PrimitiveType`]. Returns `None` if this is a
2094    /// non-primitive type.
2095    pub fn as_primitive_opt(&self) -> Option<&PrimitiveType> {
2096        match self {
2097            DataType::Primitive(ptype) => Some(ptype),
2098            _ => None,
2099        }
2100    }
2101
2102    fn fmt_recursive(&self, f: &mut Formatter<'_>, levels: &mut Vec<bool>) -> std::fmt::Result {
2103        match self {
2104            DataType::Struct(inner) => write_struct_type(inner, f, levels),
2105
2106            DataType::Array(inner) => {
2107                levels.push(true); // only one child → last
2108                write_indent(f, levels)?;
2109                writeln!(f, "array_element: {}", inner.element_type)?;
2110                inner.element_type.fmt_recursive(f, levels)?;
2111                levels.pop();
2112                Ok(())
2113            }
2114
2115            DataType::Map(inner) => {
2116                // key
2117                levels.push(false); // map_key is NOT last
2118                write_indent(f, levels)?;
2119                writeln!(f, "map_key: {}", inner.key_type)?;
2120                inner.key_type.fmt_recursive(f, levels)?;
2121                levels.pop();
2122
2123                // value
2124                levels.push(true); // map_value IS last at this level
2125                write_indent(f, levels)?;
2126                writeln!(f, "map_value: {}", inner.value_type)?;
2127                inner.value_type.fmt_recursive(f, levels)?;
2128                levels.pop();
2129                Ok(())
2130            }
2131
2132            _ => Ok(()),
2133        }
2134    }
2135}
2136
2137impl Display for DataType {
2138    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2139        match self {
2140            DataType::Primitive(p) => write!(f, "{p}"),
2141            DataType::Array(a) => write!(f, "array<{}>", a.element_type),
2142            DataType::Struct(s) => {
2143                write!(f, "struct<")?;
2144                for (i, field) in s.fields().enumerate() {
2145                    if i > 0 {
2146                        write!(f, ", ")?;
2147                    }
2148                    write!(f, "{}: {}", field.name, field.data_type)?;
2149                }
2150                write!(f, ">")
2151            }
2152            DataType::Map(m) => write!(f, "map<{}, {}>", m.key_type, m.value_type),
2153            DataType::Variant(_) => write!(f, "variant"),
2154        }
2155    }
2156}
2157
2158struct GetSchemaLeaves {
2159    path: Vec<String>,
2160    names: Vec<ColumnName>,
2161    types: Vec<DataType>,
2162}
2163impl GetSchemaLeaves {
2164    fn new(own_name: Option<&str>) -> Self {
2165        Self {
2166            path: own_name.into_iter().map(|s| s.to_string()).collect(),
2167            names: vec![],
2168            types: vec![],
2169        }
2170    }
2171}
2172
2173impl<'a> SchemaTransform<'a> for GetSchemaLeaves {
2174    transform_output_type!(|'a, T| ());
2175
2176    fn transform_struct_field(&mut self, field: &'a StructField) {
2177        self.path.push(field.name.clone());
2178        if let DataType::Struct(_) = field.data_type {
2179            self.recurse_into_struct_field(field);
2180        } else {
2181            self.names.push(ColumnName::new(&self.path));
2182            self.types.push(field.data_type.clone());
2183        }
2184        self.path.pop();
2185    }
2186}
2187
2188struct MakePhysical<'a> {
2189    column_mapping_mode: ColumnMappingMode,
2190    path: Vec<&'a str>,
2191    /// CM ids and physical names already claimed during the walk, with the first claimer.
2192    /// Threaded into `validate_and_extract_column_mapping_annotations` so duplicate IDs and
2193    /// duplicate `physicalName` values are rejected at the first collision.
2194    seen: SeenColumnMappingAnnotations<'a>,
2195}
2196impl<'a> MakePhysical<'a> {
2197    fn new(column_mapping_mode: ColumnMappingMode) -> Self {
2198        Self {
2199            column_mapping_mode,
2200            path: vec![],
2201            seen: SeenColumnMappingAnnotations::default(),
2202        }
2203    }
2204
2205    fn transform_inner<T>(
2206        &mut self,
2207        field_name: &'a str,
2208        transform: impl FnOnce(&mut Self) -> DeltaResult<T>,
2209    ) -> DeltaResult<T> {
2210        self.path.push(field_name);
2211        let result = transform(self);
2212        self.path.pop();
2213        result
2214    }
2215}
2216impl<'a> SchemaTransform<'a> for MakePhysical<'a> {
2217    transform_output_type!(|'a, T| DeltaResult<Cow<'a, T>>);
2218
2219    fn transform_array_element(&mut self, etype: &'a DataType) -> DeltaResult<Cow<'a, DataType>> {
2220        self.transform_inner("<array element>", |this| this.transform(etype))
2221    }
2222    fn transform_map_key(&mut self, ktype: &'a DataType) -> DeltaResult<Cow<'a, DataType>> {
2223        self.transform_inner("<map key>", |this| this.transform(ktype))
2224    }
2225    fn transform_map_value(&mut self, vtype: &'a DataType) -> DeltaResult<Cow<'a, DataType>> {
2226        self.transform_inner("<map value>", |this| this.transform(vtype))
2227    }
2228    fn transform_struct_field(
2229        &mut self,
2230        field: &'a StructField,
2231    ) -> DeltaResult<Cow<'a, StructField>> {
2232        self.transform_inner(field.name(), |this| {
2233            let (physical_name, _id) = validate_and_extract_column_mapping_annotations(
2234                field,
2235                this.column_mapping_mode,
2236                &this.path,
2237                Some(&mut this.seen),
2238            )?;
2239
2240            if field.is_metadata_column() {
2241                return Ok(Cow::Borrowed(field));
2242            }
2243
2244            let field = this.recurse_into_struct_field(field)?;
2245
2246            let metadata = field.logical_to_physical_metadata(this.column_mapping_mode);
2247            let name = physical_name.to_owned();
2248
2249            Ok(Cow::Owned(field.with_name(name).with_metadata(metadata)))
2250        })
2251    }
2252
2253    fn transform_variant(&mut self, stype: &'a StructType) -> DeltaResult<Cow<'a, StructType>> {
2254        // There is no column mapping metadata inside the struct fields of a variant, so
2255        // we do not recurse into the variant fields
2256        Ok(Cow::Borrowed(stype))
2257    }
2258}
2259
2260#[cfg(test)]
2261mod tests {
2262    use rstest::rstest;
2263    use serde_json;
2264
2265    use super::*;
2266    use crate::table_features::ColumnMappingMode;
2267    use crate::utils::test_utils::{
2268        assert_result_error_with_message, test_deep_nested_schema_missing_leaf_cm,
2269    };
2270
2271    fn example_schema_metadata() -> &'static str {
2272        r#"
2273            {
2274                "name": "e",
2275                "type": {
2276                    "type": "array",
2277                    "elementType": {
2278                        "type": "struct",
2279                        "fields": [
2280                            {
2281                                "name": "d",
2282                                "type": "integer",
2283                                "nullable": false,
2284                                "metadata": {
2285                                    "delta.columnMapping.id": 5,
2286                                    "delta.columnMapping.physicalName": "col-a7f4159c-53be-4cb0-b81a-f7e5240cfc49"
2287                                }
2288                            }
2289                        ]
2290                    },
2291                    "containsNull": true
2292                },
2293                "nullable": true,
2294                "metadata": {
2295                    "delta.columnMapping.id": 4,
2296                    "delta.columnMapping.physicalName": "col-5f422f40-de70-45b2-88ab-1d5c90e94db1",
2297                    "delta.identity.start": 2147483648
2298                }
2299            }"#
2300    }
2301
2302    #[test]
2303    fn test_serde_data_types() {
2304        let data = r#"
2305        {
2306            "name": "a",
2307            "type": "integer",
2308            "nullable": false,
2309            "metadata": {}
2310        }
2311        "#;
2312        let field: StructField = serde_json::from_str(data).unwrap();
2313        assert!(matches!(field.data_type, DataType::INTEGER));
2314
2315        let data = r#"
2316        {
2317            "name": "c",
2318            "type": {
2319                "type": "array",
2320                "elementType": "integer",
2321                "containsNull": false
2322            },
2323            "nullable": true,
2324            "metadata": {}
2325        }
2326        "#;
2327        let field: StructField = serde_json::from_str(data).unwrap();
2328        assert!(matches!(field.data_type, DataType::Array(_)));
2329
2330        let data = r#"
2331        {
2332            "name": "e",
2333            "type": {
2334                "type": "array",
2335                "elementType": {
2336                    "type": "struct",
2337                    "fields": [
2338                        {
2339                            "name": "d",
2340                            "type": "integer",
2341                            "nullable": false,
2342                            "metadata": {}
2343                        }
2344                    ]
2345                },
2346                "containsNull": true
2347            },
2348            "nullable": true,
2349            "metadata": {}
2350        }
2351        "#;
2352        let field: StructField = serde_json::from_str(data).unwrap();
2353        assert!(matches!(field.data_type, DataType::Array(_)));
2354        match field.data_type {
2355            DataType::Array(array) => assert!(matches!(array.element_type, DataType::Struct(_))),
2356            _ => unreachable!(),
2357        }
2358
2359        let data = r#"
2360        {
2361            "name": "f",
2362            "type": {
2363                "type": "map",
2364                "keyType": "string",
2365                "valueType": "string",
2366                "valueContainsNull": true
2367            },
2368            "nullable": true,
2369            "metadata": {}
2370        }
2371        "#;
2372        let field: StructField = serde_json::from_str(data).unwrap();
2373        assert!(matches!(field.data_type, DataType::Map(_)));
2374    }
2375
2376    #[test]
2377    fn test_roundtrip_decimal() {
2378        let data = r#"
2379        {
2380            "name": "a",
2381            "type": "decimal(10, 2)",
2382            "nullable": false,
2383            "metadata": {}
2384        }
2385        "#;
2386        let field: StructField = serde_json::from_str(data).unwrap();
2387        assert_eq!(field.data_type, DataType::decimal(10, 2).unwrap());
2388
2389        let json_str = serde_json::to_string(&field).unwrap();
2390        assert_eq!(
2391            json_str,
2392            r#"{"name":"a","type":"decimal(10,2)","nullable":false,"metadata":{}}"#
2393        );
2394    }
2395
2396    #[test]
2397    fn test_roundtrip_variant() {
2398        let data = r#"
2399        {
2400            "name": "v",
2401            "type": "variant",
2402            "nullable": false,
2403            "metadata": {}
2404        }
2405        "#;
2406        let field: StructField = serde_json::from_str(data).unwrap();
2407        assert_eq!(field.data_type, DataType::unshredded_variant());
2408
2409        let json_str = serde_json::to_string(&field).unwrap();
2410        assert_eq!(
2411            json_str,
2412            r#"{"name":"v","type":"variant","nullable":false,"metadata":{}}"#
2413        );
2414    }
2415
2416    #[test]
2417    fn test_unshredded_variant() {
2418        let unshredded_variant_type = DataType::unshredded_variant();
2419
2420        match &unshredded_variant_type {
2421            DataType::Variant(struct_type) => {
2422                let fields: Vec<_> = struct_type.fields().collect();
2423                assert_eq!(fields.len(), 2);
2424
2425                assert_eq!(fields[0].name, "metadata");
2426                assert_eq!(fields[0].data_type, DataType::BINARY);
2427                assert!(!fields[0].nullable);
2428
2429                assert_eq!(fields[1].name, "value");
2430                assert_eq!(fields[1].data_type, DataType::BINARY);
2431                assert!(!fields[1].nullable);
2432            }
2433            _ => panic!("Expected DataType::Variant, got {unshredded_variant_type:?}"),
2434        }
2435    }
2436
2437    #[rstest]
2438    #[case("interval second")]
2439    #[case("interval day")]
2440    #[case("money")]
2441    fn test_unsupported_type_error_message(#[case] unsupported_type: &str) {
2442        let data = format!(
2443            r#"{{
2444                "name": "test_field",
2445                "type": "{unsupported_type}",
2446                "nullable": false,
2447                "metadata": {{}}
2448            }}"#
2449        );
2450        let result: Result<StructField, _> = serde_json::from_str(&data);
2451        assert!(result.is_err());
2452        let err = result.unwrap_err();
2453        let expected_msg = format!("Unsupported Delta table type: '{unsupported_type}'");
2454        assert!(
2455            err.to_string().contains(&expected_msg),
2456            "Expected error message about unsupported type '{unsupported_type}', got: {err}"
2457        );
2458    }
2459
2460    #[rstest]
2461    #[case("string", DataType::STRING)]
2462    #[case("long", DataType::LONG)]
2463    #[case("integer", DataType::INTEGER)]
2464    #[case("short", DataType::SHORT)]
2465    #[case("byte", DataType::BYTE)]
2466    #[case("float", DataType::FLOAT)]
2467    #[case("double", DataType::DOUBLE)]
2468    #[case("boolean", DataType::BOOLEAN)]
2469    #[case("binary", DataType::BINARY)]
2470    #[case("date", DataType::DATE)]
2471    #[case("timestamp", DataType::TIMESTAMP)]
2472    #[case("timestamp_ntz", DataType::TIMESTAMP_NTZ)]
2473    fn test_primitive_type_deserialization_still_works(
2474        #[case] type_str: &str,
2475        #[case] expected_type: DataType,
2476    ) {
2477        let data = format!(
2478            r#"{{
2479                "name": "test_field",
2480                "type": "{type_str}",
2481                "nullable": false,
2482                "metadata": {{}}
2483            }}"#
2484        );
2485        let field: StructField = serde_json::from_str(&data).unwrap();
2486        assert_eq!(field.data_type, expected_type);
2487    }
2488
2489    #[rstest]
2490    #[case(10, 2)]
2491    #[case(16, 4)]
2492    #[case(38, 10)]
2493    fn test_decimal_with_primitive_deserializer(#[case] precision: u8, #[case] scale: u8) {
2494        let data = format!(
2495            r#"{{
2496                "name": "test_decimal",
2497                "type": "decimal({precision},{scale})",
2498                "nullable": false,
2499                "metadata": {{}}
2500            }}"#
2501        );
2502        let field: StructField = serde_json::from_str(&data).unwrap();
2503        assert_eq!(
2504            field.data_type,
2505            DataType::decimal(precision, scale).unwrap()
2506        );
2507    }
2508
2509    #[rstest]
2510    #[case("decimal(invalid)", "Invalid precision in decimal")]
2511    #[case("decimal(10)", "Invalid scale in decimal")]
2512    #[case("decimal()", "Invalid precision in decimal")]
2513    #[case("decimal(10,2,99)", "Invalid decimal format (expected 2 parts)")]
2514    fn test_invalid_decimal_format(#[case] invalid_type: &str, #[case] expected_error: &str) {
2515        let data = format!(
2516            r#"{{
2517                "name": "invalid",
2518                "type": "{invalid_type}",
2519                "nullable": false,
2520                "metadata": {{}}
2521            }}"#
2522        );
2523        let result: Result<StructField, _> = serde_json::from_str(&data);
2524        assert!(result.is_err());
2525        let err = result.unwrap_err();
2526        assert!(
2527            err.to_string().contains(expected_error),
2528            "Expected error containing '{expected_error}', got: {err}"
2529        );
2530    }
2531
2532    #[rstest]
2533    #[case(
2534        r#"{"type": "array", "elementType": "integer", "containsNull": false}"#,
2535        DataType::Array(Box::new(ArrayType::new(DataType::INTEGER, false)))
2536    )]
2537    #[case(
2538        r#"{"type": "struct", "fields": [{"name": "a", "type": "integer", "nullable": false, "metadata": {}}, {"name": "b", "type": "string", "nullable": true, "metadata": {}}]}"#,
2539        DataType::Struct(Box::new(StructType::new_unchecked([
2540            StructField::new("a", DataType::INTEGER, false),
2541            StructField::new("b", DataType::STRING, true),
2542        ])))
2543    )]
2544    #[case(
2545        r#"{"type": "map", "keyType": "string", "valueType": "integer", "valueContainsNull": true}"#,
2546        DataType::Map(Box::new(MapType::new(DataType::STRING, DataType::INTEGER, true)))
2547    )]
2548    #[case("\"string\"", DataType::STRING)]
2549    #[case("\"long\"", DataType::LONG)]
2550    #[case("\"integer\"", DataType::INTEGER)]
2551    #[case("\"short\"", DataType::SHORT)]
2552    #[case("\"byte\"", DataType::BYTE)]
2553    #[case("\"float\"", DataType::FLOAT)]
2554    #[case("\"double\"", DataType::DOUBLE)]
2555    #[case("\"boolean\"", DataType::BOOLEAN)]
2556    #[case("\"binary\"", DataType::BINARY)]
2557    #[case("\"date\"", DataType::DATE)]
2558    #[case("\"timestamp\"", DataType::TIMESTAMP)]
2559    #[case("\"timestamp_ntz\"", DataType::TIMESTAMP_NTZ)]
2560    #[case("\"variant\"", DataType::unshredded_variant())]
2561    fn test_data_type_deserialization(#[case] type_json: &str, #[case] expected: DataType) {
2562        let data_type: DataType = serde_json::from_str(type_json).unwrap();
2563        assert_eq!(data_type, expected);
2564    }
2565
2566    #[test]
2567    fn test_make_physical_no_column_mapping() {
2568        let field = StructField::nullable(
2569            "e",
2570            ArrayType::new(
2571                StructType::new_unchecked([StructField::not_null("d", DataType::INTEGER)]).into(),
2572                true,
2573            ),
2574        );
2575        let physical_field = field.make_physical(ColumnMappingMode::None).unwrap();
2576
2577        assert_eq!(physical_field.name, "e");
2578        assert!(physical_field
2579            .get_config_value(&ColumnMetadataKey::ColumnMappingId)
2580            .is_none());
2581        assert!(physical_field
2582            .get_config_value(&ColumnMetadataKey::ColumnMappingPhysicalName)
2583            .is_none());
2584
2585        let DataType::Array(atype) = physical_field.data_type else {
2586            panic!("Expected an Array");
2587        };
2588        let DataType::Struct(stype) = atype.element_type else {
2589            panic!("Expected a Struct");
2590        };
2591        let struct_field = stype.fields.get_index(0).unwrap().1;
2592        assert_eq!(struct_field.name, "d");
2593    }
2594
2595    #[test]
2596    fn test_make_physical_rejects_annotated_fields_when_column_mapping_disabled() {
2597        let data = example_schema_metadata();
2598        let field: StructField = serde_json::from_str(data).unwrap();
2599        assert!(field.make_physical(ColumnMappingMode::None).is_err());
2600    }
2601
2602    #[test]
2603    fn test_make_physical_rejects_unannotated_leaf_in_deep_nesting() {
2604        let schema = test_deep_nested_schema_missing_leaf_cm();
2605        let field = schema.fields().next().unwrap();
2606        let err = field
2607            .make_physical(ColumnMappingMode::Name)
2608            .unwrap_err()
2609            .to_string();
2610        assert!(
2611            err.contains("top.`<array element>`.mid_field.`<map value>`.leaf"),
2612            "Expected full nested path in error, got: {err}"
2613        );
2614    }
2615
2616    #[test]
2617    fn test_make_physical_rejects_duplicate_column_mapping_ids() {
2618        use crate::schema::ColumnMetadataKey;
2619
2620        fn cm_field(name: &str, id: i64, data_type: impl Into<DataType>) -> StructField {
2621            StructField::not_null(name, data_type).with_metadata([
2622                (
2623                    ColumnMetadataKey::ColumnMappingId.as_ref(),
2624                    MetadataValue::Number(id),
2625                ),
2626                (
2627                    ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(),
2628                    MetadataValue::String(format!("col-{name}")),
2629                ),
2630            ])
2631        }
2632
2633        let inner = StructType::new_unchecked([
2634            cm_field("x", 3, DataType::INTEGER),
2635            cm_field("y", 4, DataType::STRING),
2636        ]);
2637        let schema = StructType::new_unchecked([
2638            cm_field("a", 1, DataType::INTEGER),
2639            cm_field(
2640                "b",
2641                2,
2642                ArrayType::new(DataType::Struct(Box::new(inner)), true),
2643            ),
2644            cm_field("c", 3, DataType::STRING),
2645        ]);
2646        assert_result_error_with_message(
2647            schema.make_physical(ColumnMappingMode::Id),
2648            "Duplicate column mapping ID",
2649        );
2650    }
2651
2652    /// Two fields sharing the same `delta.columnMapping.physicalName` (at top level OR across
2653    /// nesting depth) must be rejected during `make_physical`. PROTOCOL.md requires
2654    /// `physicalName` to be a "globally unique identifier"; without dedup, two columns sharing
2655    /// a physical name would resolve ambiguously in parquet under `ColumnMappingMode::Name`.
2656    /// The walk runs from `TableConfiguration::try_new` so both CREATE and ALTER hit it.
2657    #[rstest]
2658    #[case::same_level("a", "a")]
2659    #[case::nested("a", "x")]
2660    fn test_make_physical_rejects_duplicate_physical_names(
2661        #[case] outer_name: &str,
2662        #[case] inner_name: &str,
2663    ) {
2664        use crate::schema::ColumnMetadataKey;
2665
2666        // Both fields advertise the SAME physicalName ("col-shared") with distinct ids
2667        // (so the duplicate-ID check doesn't fire first). For `same_level` both fields are
2668        // top-level siblings; for `nested` the second field lives one struct deeper to prove
2669        // the walker checks across nesting boundaries.
2670        fn cm_field(
2671            name: &str,
2672            id: i64,
2673            physical: &str,
2674            data_type: impl Into<DataType>,
2675        ) -> StructField {
2676            StructField::not_null(name, data_type).with_metadata([
2677                (
2678                    ColumnMetadataKey::ColumnMappingId.as_ref(),
2679                    MetadataValue::Number(id),
2680                ),
2681                (
2682                    ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(),
2683                    MetadataValue::String(physical.to_string()),
2684                ),
2685            ])
2686        }
2687
2688        let schema = if outer_name == inner_name {
2689            StructType::new_unchecked([
2690                cm_field(outer_name, 1, "col-shared", DataType::INTEGER),
2691                cm_field("sibling", 2, "col-shared", DataType::STRING),
2692            ])
2693        } else {
2694            let inner = StructType::new_unchecked([cm_field(
2695                inner_name,
2696                2,
2697                "col-shared",
2698                DataType::INTEGER,
2699            )]);
2700            StructType::new_unchecked([
2701                cm_field(outer_name, 1, "col-shared", DataType::INTEGER),
2702                cm_field(
2703                    "nested_holder",
2704                    3,
2705                    "col-nested-holder",
2706                    DataType::Struct(Box::new(inner)),
2707                ),
2708            ])
2709        };
2710        assert_result_error_with_message(
2711            schema.make_physical(ColumnMappingMode::Name),
2712            "Duplicate `delta.columnMapping.physicalName` 'col-shared'",
2713        );
2714    }
2715
2716    #[test]
2717    fn test_make_physical_column_mapping() {
2718        [ColumnMappingMode::Name, ColumnMappingMode::Id]
2719            .into_iter()
2720            .for_each(|mode| {
2721                let data = example_schema_metadata();
2722
2723                let field: StructField = serde_json::from_str(data).unwrap();
2724
2725                let col_id = field
2726                    .get_config_value(&ColumnMetadataKey::ColumnMappingId)
2727                    .unwrap();
2728                let id_start = field
2729                    .get_config_value(&ColumnMetadataKey::IdentityStart)
2730                    .unwrap();
2731                assert!(matches!(col_id, MetadataValue::Number(num) if *num == 4));
2732                assert!(matches!(id_start, MetadataValue::Number(num) if *num == 2147483648i64));
2733                assert_eq!(
2734                    field.physical_name(mode),
2735                    "col-5f422f40-de70-45b2-88ab-1d5c90e94db1"
2736                );
2737                let physical_field = field.make_physical(mode).unwrap();
2738
2739                // Parquet field id should only be present in id column mapping mode
2740                match mode {
2741                    ColumnMappingMode::Id => {
2742                        assert!(matches!(
2743                            physical_field.get_config_value(&ColumnMetadataKey::ParquetFieldId),
2744                            Some(MetadataValue::Number(4))
2745                        ));
2746
2747                        assert!(matches!(
2748                            physical_field.get_config_value(&ColumnMetadataKey::ColumnMappingId),
2749                            Some(MetadataValue::Number(4))
2750                        ));
2751                    }
2752                    ColumnMappingMode::Name => {
2753                        assert!(matches!(
2754                            physical_field.get_config_value(&ColumnMetadataKey::ParquetFieldId),
2755                            Some(MetadataValue::Number(4))
2756                        ));
2757                        assert!(matches!(
2758                            physical_field.get_config_value(&ColumnMetadataKey::ColumnMappingId),
2759                            Some(MetadataValue::Number(4))
2760                        ));
2761                    }
2762                    ColumnMappingMode::None => panic!("unexpected column mapping mode"),
2763                }
2764
2765                assert_eq!(
2766                    physical_field.name,
2767                    "col-5f422f40-de70-45b2-88ab-1d5c90e94db1"
2768                );
2769                let DataType::Array(atype) = physical_field.data_type else {
2770                    panic!("Expected an Array");
2771                };
2772                let DataType::Struct(stype) = atype.element_type else {
2773                    panic!("Expected a Struct");
2774                };
2775
2776                let struct_field = stype.fields.get_index(0).unwrap().1;
2777                assert_eq!(
2778                    struct_field.name,
2779                    "col-a7f4159c-53be-4cb0-b81a-f7e5240cfc49"
2780                );
2781
2782                // The subfield should also have ParquetFieldId present it column mapping id mode
2783                match mode {
2784                    ColumnMappingMode::Id => {
2785                        assert!(matches!(
2786                            struct_field.get_config_value(&ColumnMetadataKey::ParquetFieldId),
2787                            Some(MetadataValue::Number(5))
2788                        ));
2789                        assert!(matches!(
2790                            struct_field.get_config_value(&ColumnMetadataKey::ColumnMappingId),
2791                            Some(MetadataValue::Number(5))
2792                        ));
2793                    }
2794                    ColumnMappingMode::Name => {
2795                        assert!(matches!(
2796                            struct_field.get_config_value(&ColumnMetadataKey::ParquetFieldId),
2797                            Some(MetadataValue::Number(5))
2798                        ));
2799                        assert!(matches!(
2800                            struct_field.get_config_value(&ColumnMetadataKey::ColumnMappingId),
2801                            Some(MetadataValue::Number(5))
2802                        ));
2803                    }
2804                    ColumnMappingMode::None => panic!("unexpected column mapping mode"),
2805                }
2806            });
2807    }
2808
2809    #[test]
2810    fn test_make_physical_passes_metadata_column_through() {
2811        let field = StructField::create_metadata_column(
2812            "_metadata.row_index",
2813            MetadataColumnSpec::RowIndex,
2814        );
2815        for mode in [
2816            ColumnMappingMode::None,
2817            ColumnMappingMode::Name,
2818            ColumnMappingMode::Id,
2819        ] {
2820            let physical = field.make_physical(mode).unwrap();
2821            assert_eq!(physical.name(), "_metadata.row_index");
2822            assert!(physical.is_metadata_column());
2823        }
2824    }
2825
2826    #[test]
2827    fn test_make_physical_rejects_metadata_column_with_cm_annotations() {
2828        let field = StructField::create_metadata_column(
2829            "_metadata.row_index",
2830            MetadataColumnSpec::RowIndex,
2831        )
2832        .add_metadata([(
2833            ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(),
2834            MetadataValue::String("phys".to_string()),
2835        )]);
2836        assert_result_error_with_message(
2837            field.make_physical(ColumnMappingMode::Name),
2838            "must not have column mapping annotations",
2839        );
2840    }
2841
2842    #[test]
2843    fn test_read_schemas() {
2844        let file = std::fs::File::open("./tests/serde/schema.json").unwrap();
2845        let schema: Result<Schema, _> = serde_json::from_reader(file);
2846        assert!(schema.is_ok());
2847
2848        let file = std::fs::File::open("./tests/serde/checkpoint_schema.json").unwrap();
2849        let schema: Result<Schema, _> = serde_json::from_reader(file);
2850        assert!(schema.is_ok())
2851    }
2852
2853    #[test]
2854    fn test_invalid_decimal() {
2855        let data = r#"
2856        {
2857            "name": "a",
2858            "type": "decimal(39, 10)",
2859            "nullable": false,
2860            "metadata": {}
2861        }
2862        "#;
2863        assert!(serde_json::from_str::<StructField>(data).is_err());
2864
2865        let data = r#"
2866        {
2867            "name": "a",
2868            "type": "decimal(10, 39)",
2869            "nullable": false,
2870            "metadata": {}
2871        }
2872        "#;
2873        assert!(serde_json::from_str::<StructField>(data).is_err());
2874    }
2875
2876    #[test]
2877    fn test_metadata_value_to_string() {
2878        assert_eq!(MetadataValue::Number(0).to_string(), "0");
2879        assert_eq!(
2880            MetadataValue::String("hello".to_string()).to_string(),
2881            "hello"
2882        );
2883        assert_eq!(MetadataValue::Boolean(true).to_string(), "true");
2884        assert_eq!(MetadataValue::Boolean(false).to_string(), "false");
2885        let object_json = serde_json::json!({ "an": "object" });
2886        assert_eq!(
2887            MetadataValue::Other(object_json).to_string(),
2888            "{\"an\":\"object\"}"
2889        );
2890        let array_json = serde_json::json!(["an", "array"]);
2891        assert_eq!(
2892            MetadataValue::Other(array_json).to_string(),
2893            "[\"an\",\"array\"]"
2894        );
2895    }
2896
2897    #[test]
2898    fn test_num_fields() {
2899        let schema = StructType::new_unchecked([]);
2900        assert!(schema.num_fields() == 0);
2901        let schema = StructType::new_unchecked([
2902            StructField::nullable("a", DataType::LONG),
2903            StructField::nullable("b", DataType::LONG),
2904            StructField::nullable("c", DataType::LONG),
2905            StructField::nullable("d", DataType::LONG),
2906        ]);
2907        assert_eq!(schema.num_fields(), 4);
2908        let schema = StructType::new_unchecked([
2909            StructField::nullable("b", DataType::LONG),
2910            StructField::not_null("b", DataType::LONG),
2911            StructField::nullable("c", DataType::LONG),
2912            StructField::nullable("c", DataType::LONG),
2913        ]);
2914        assert_eq!(schema.num_fields(), 2);
2915    }
2916
2917    #[test]
2918    fn test_has_invariants() {
2919        // Schema with no invariants
2920        let schema = StructType::new_unchecked([
2921            StructField::nullable("a", DataType::STRING),
2922            StructField::nullable("b", DataType::INTEGER),
2923        ]);
2924        assert!(!schema_has_invariants(&schema));
2925
2926        // Schema with top-level invariant
2927        let mut field = StructField::nullable("c", DataType::STRING);
2928        field.metadata.insert(
2929            ColumnMetadataKey::Invariants.as_ref().to_string(),
2930            MetadataValue::String("c > 0".to_string()),
2931        );
2932
2933        let schema =
2934            StructType::new_unchecked([StructField::nullable("a", DataType::STRING), field]);
2935        assert!(schema_has_invariants(&schema));
2936
2937        // Schema with nested invariant in a struct
2938        let nested_field = StructField::nullable(
2939            "nested_c",
2940            DataType::try_struct_type([{
2941                let mut field = StructField::nullable("d", DataType::INTEGER);
2942                field.metadata.insert(
2943                    ColumnMetadataKey::Invariants.as_ref().to_string(),
2944                    MetadataValue::String("d > 0".to_string()),
2945                );
2946                field
2947            }])
2948            .unwrap(),
2949        );
2950
2951        let schema = StructType::new_unchecked([
2952            StructField::nullable("a", DataType::STRING),
2953            StructField::nullable("b", DataType::INTEGER),
2954            nested_field,
2955        ]);
2956        assert!(schema_has_invariants(&schema));
2957
2958        // Schema with nested invariant in an array of structs
2959        let array_field = StructField::nullable(
2960            "array_field",
2961            ArrayType::new(
2962                DataType::try_struct_type([{
2963                    let mut field = StructField::nullable("d", DataType::INTEGER);
2964                    field.metadata.insert(
2965                        ColumnMetadataKey::Invariants.as_ref().to_string(),
2966                        MetadataValue::String("d > 0".to_string()),
2967                    );
2968                    field
2969                }])
2970                .unwrap(),
2971                true,
2972            ),
2973        );
2974
2975        let schema = StructType::new_unchecked([
2976            StructField::nullable("a", DataType::STRING),
2977            StructField::nullable("b", DataType::INTEGER),
2978            array_field,
2979        ]);
2980        assert!(schema_has_invariants(&schema));
2981
2982        // Schema with nested invariant in a map value that's a struct
2983        let map_field = StructField::nullable(
2984            "map_field",
2985            MapType::new(
2986                DataType::STRING,
2987                DataType::try_struct_type([{
2988                    let mut field = StructField::nullable("d", DataType::INTEGER);
2989                    field.metadata.insert(
2990                        ColumnMetadataKey::Invariants.as_ref().to_string(),
2991                        MetadataValue::String("d > 0".to_string()),
2992                    );
2993                    field
2994                }])
2995                .unwrap(),
2996                true,
2997            ),
2998        );
2999
3000        let schema = StructType::new_unchecked([
3001            StructField::nullable("a", DataType::STRING),
3002            StructField::nullable("b", DataType::INTEGER),
3003            map_field,
3004        ]);
3005        assert!(schema_has_invariants(&schema));
3006    }
3007
3008    fn all_nullable_schema() -> StructType {
3009        StructType::new_unchecked([
3010            StructField::nullable("a", DataType::STRING),
3011            StructField::nullable("b", DataType::INTEGER),
3012        ])
3013    }
3014
3015    fn top_level_non_null_schema() -> StructType {
3016        StructType::new_unchecked([
3017            StructField::not_null("id", DataType::INTEGER),
3018            StructField::nullable("name", DataType::STRING),
3019        ])
3020    }
3021
3022    fn nested_non_null_schema() -> StructType {
3023        let nested_field = StructField::nullable(
3024            "parent",
3025            DataType::try_struct_type([StructField::not_null("child", DataType::INTEGER)]).unwrap(),
3026        );
3027        StructType::new_unchecked([StructField::nullable("a", DataType::STRING), nested_field])
3028    }
3029
3030    fn array_non_null_schema() -> StructType {
3031        let array_field = StructField::nullable(
3032            "arr",
3033            ArrayType::new(
3034                DataType::try_struct_type([StructField::not_null("child", DataType::INTEGER)])
3035                    .unwrap(),
3036                true,
3037            ),
3038        );
3039        StructType::new_unchecked([array_field])
3040    }
3041
3042    fn map_non_null_schema() -> StructType {
3043        let map_field = StructField::nullable(
3044            "map",
3045            MapType::new(
3046                DataType::STRING,
3047                DataType::try_struct_type([StructField::not_null("child", DataType::INTEGER)])
3048                    .unwrap(),
3049                true,
3050            ),
3051        );
3052        StructType::new_unchecked([map_field])
3053    }
3054
3055    fn variant_only_schema() -> StructType {
3056        // Variant internal fields (metadata, value) are protocol-defined non-null but
3057        // must NOT be counted as user-controlled non-null fields.
3058        StructType::new_unchecked([StructField::nullable("v", DataType::unshredded_variant())])
3059    }
3060
3061    #[rstest]
3062    #[case::all_nullable(all_nullable_schema(), false)]
3063    #[case::top_level(top_level_non_null_schema(), true)]
3064    #[case::nested_struct(nested_non_null_schema(), true)]
3065    #[case::array_element(array_non_null_schema(), true)]
3066    #[case::map_value(map_non_null_schema(), true)]
3067    #[case::variant_skipped(variant_only_schema(), false)]
3068    fn test_schema_contains_non_null_fields(#[case] schema: StructType, #[case] expected: bool) {
3069        assert_eq!(schema_contains_non_null_fields(&schema), expected);
3070    }
3071
3072    #[test]
3073    fn test_struct_type_iterator_basic() {
3074        let fields = vec![
3075            StructField::new("field1", DataType::STRING, true),
3076            StructField::new("field2", DataType::INTEGER, false),
3077            StructField::new("field3", DataType::BOOLEAN, true),
3078        ];
3079        let struct_type = StructType::new_unchecked(fields.clone());
3080
3081        // Test fields() method returns reference iterator
3082        let field_names: Vec<_> = struct_type.fields().map(|f| f.name()).collect();
3083        assert_eq!(field_names, vec!["field1", "field2", "field3"]);
3084
3085        // Test that we can still access the struct_type after using fields()
3086        assert_eq!(struct_type.field("field1").unwrap().name, "field1");
3087    }
3088
3089    #[test]
3090    fn test_struct_type_into_iterator_owned() {
3091        let fields = vec![
3092            StructField::new("a", DataType::STRING, true),
3093            StructField::new("b", DataType::INTEGER, false),
3094        ];
3095        let struct_type = StructType::new_unchecked(fields);
3096
3097        // Test owned iteration (consumes the struct)
3098        let mut field_names = Vec::new();
3099        for field in struct_type {
3100            field_names.push(field.name);
3101        }
3102        assert_eq!(field_names, vec!["a", "b"]);
3103    }
3104
3105    #[test]
3106    fn test_struct_type_into_iterator_references() {
3107        let fields = vec![
3108            StructField::new("x", DataType::DOUBLE, true),
3109            StructField::new("y", DataType::FLOAT, false),
3110            StructField::new("z", DataType::LONG, true),
3111        ];
3112        let struct_type = StructType::new_unchecked(fields);
3113
3114        // Test reference iteration (does not consume the struct)
3115        let mut field_names = Vec::new();
3116        for field in &struct_type {
3117            field_names.push(field.name().clone());
3118        }
3119        assert_eq!(field_names, vec!["x", "y", "z"]);
3120
3121        // Should still be able to use struct_type after iteration
3122        assert_eq!(struct_type.field("x").unwrap().name, "x");
3123    }
3124
3125    #[test]
3126    fn test_iterator_exact_size() {
3127        let fields = vec![
3128            StructField::new("field1", DataType::STRING, true),
3129            StructField::new("field2", DataType::INTEGER, false),
3130            StructField::new("field3", DataType::BOOLEAN, true),
3131            StructField::new("field4", DataType::DATE, true),
3132        ];
3133
3134        // Test ExactSizeIterator for reference iterator
3135        let struct_type = StructType::new_unchecked(fields.clone());
3136        let ref_iter = struct_type.fields();
3137        assert_eq!(ref_iter.len(), 4);
3138
3139        // Test ExactSizeIterator for &StructType into_iter
3140        let struct_type = StructType::new_unchecked(fields.clone());
3141        let into_ref_iter = (&struct_type).into_iter();
3142        assert_eq!(into_ref_iter.len(), 4);
3143
3144        // Test ExactSizeIterator for StructType into_iter (consuming)
3145        let struct_type = StructType::new_unchecked(fields);
3146        let into_owned_iter = struct_type.into_iter();
3147        assert_eq!(into_owned_iter.len(), 4);
3148    }
3149
3150    #[test]
3151    fn test_iterator_with_metadata() {
3152        let field_with_metadata = StructField::new("test_field", DataType::STRING, true)
3153            .with_metadata([("key1", MetadataValue::String("value1".to_string()))]);
3154
3155        let struct_type = StructType::new_unchecked([field_with_metadata]);
3156
3157        // Test that metadata is preserved through iteration
3158        for field in &struct_type {
3159            assert_eq!(field.metadata().len(), 1);
3160            assert_eq!(
3161                field.metadata().get("key1"),
3162                Some(&MetadataValue::String("value1".to_string()))
3163            );
3164        }
3165
3166        // Test consuming iterator preserves metadata
3167        for field in struct_type {
3168            assert_eq!(field.metadata().len(), 1);
3169            assert_eq!(
3170                field.metadata().get("key1"),
3171                Some(&MetadataValue::String("value1".to_string()))
3172            );
3173        }
3174    }
3175
3176    #[test]
3177    fn test_empty_struct_type_iterator() {
3178        let struct_type = StructType::new_unchecked(std::iter::empty::<StructField>());
3179
3180        // Test all iterator methods with empty struct
3181        assert_eq!(struct_type.fields().count(), 0);
3182        assert_eq!((&struct_type).into_iter().count(), 0);
3183        assert_eq!(struct_type.into_iter().count(), 0);
3184    }
3185
3186    #[test]
3187    fn test_iterator_order_preservation() {
3188        let fields = vec![
3189            StructField::new("zebra", DataType::STRING, true),
3190            StructField::new("apple", DataType::INTEGER, false),
3191            StructField::new("banana", DataType::BOOLEAN, true),
3192        ];
3193        let struct_type = StructType::new_unchecked(fields);
3194
3195        // IndexMap should preserve insertion order
3196        let field_names: Vec<_> = struct_type.fields().map(|f| f.name()).collect();
3197        assert_eq!(field_names, vec!["zebra", "apple", "banana"]);
3198
3199        // Order should be the same across different iterator methods
3200        let ref_names: Vec<_> = (&struct_type).into_iter().map(|f| f.name()).collect();
3201        assert_eq!(ref_names, vec!["zebra", "apple", "banana"]);
3202
3203        // Test consuming iterator maintains order too
3204        let owned_names: Vec<_> = struct_type.into_iter().map(|f| f.name).collect();
3205        assert_eq!(owned_names, vec!["zebra", "apple", "banana"]);
3206    }
3207
3208    #[test]
3209    fn test_iterator_collect() {
3210        let original_fields = vec![
3211            StructField::new("field1", DataType::STRING, true),
3212            StructField::new("field2", DataType::INTEGER, false),
3213        ];
3214        let struct_type = StructType::new_unchecked(original_fields.clone());
3215
3216        // Test collecting from reference iterator
3217        let collected_refs: Vec<&StructField> = struct_type.fields().collect();
3218        assert_eq!(collected_refs.len(), 2);
3219        assert_eq!(collected_refs[0].name, "field1");
3220        assert_eq!(collected_refs[1].name, "field2");
3221
3222        // Test collecting from consuming iterator
3223        let collected_owned: Vec<StructField> = struct_type.into_iter().collect();
3224        assert_eq!(collected_owned.len(), 2);
3225        assert_eq!(collected_owned[0].name, "field1");
3226        assert_eq!(collected_owned[1].name, "field2");
3227    }
3228
3229    #[test]
3230    fn test_iterator_functional_methods() {
3231        let fields = vec![
3232            StructField::new("nullable_string", DataType::STRING, true),
3233            StructField::new("required_int", DataType::INTEGER, false),
3234            StructField::new("nullable_bool", DataType::BOOLEAN, true),
3235            StructField::new("required_long", DataType::LONG, false),
3236        ];
3237        let struct_type = StructType::new_unchecked(fields);
3238
3239        // Test filter - find nullable fields
3240        let nullable_count = struct_type.fields().filter(|f| f.is_nullable()).count();
3241        assert_eq!(nullable_count, 2);
3242
3243        // Test map and filter chain
3244        let required_field_names: Vec<_> = struct_type
3245            .fields()
3246            .filter(|f| !f.is_nullable())
3247            .map(|f| f.name())
3248            .collect();
3249        assert_eq!(required_field_names, vec!["required_int", "required_long"]);
3250
3251        // Test enumerate
3252        for (index, field) in struct_type.fields().enumerate() {
3253            match index {
3254                0 => assert_eq!(field.name, "nullable_string"),
3255                1 => assert_eq!(field.name, "required_int"),
3256                2 => assert_eq!(field.name, "nullable_bool"),
3257                3 => assert_eq!(field.name, "required_long"),
3258                _ => panic!("Unexpected field index: {index}"),
3259            }
3260        }
3261    }
3262
3263    #[test]
3264    fn test_double_ended_iterator_ref() {
3265        let fields = vec![
3266            StructField::new("first", DataType::STRING, true),
3267            StructField::new("second", DataType::INTEGER, false),
3268            StructField::new("third", DataType::BOOLEAN, true),
3269            StructField::new("fourth", DataType::LONG, false),
3270        ];
3271        let struct_type = StructType::new_unchecked(fields);
3272
3273        // Test iterating from both ends using reference iterator
3274        let mut iter = struct_type.fields();
3275
3276        // Forward iteration
3277        assert_eq!(iter.next().unwrap().name, "first");
3278        assert_eq!(iter.next().unwrap().name, "second");
3279
3280        // Backward iteration
3281        assert_eq!(iter.next_back().unwrap().name, "fourth");
3282        assert_eq!(iter.next_back().unwrap().name, "third");
3283
3284        // Should be exhausted
3285        assert!(iter.next().is_none());
3286        assert!(iter.next_back().is_none());
3287    }
3288
3289    #[test]
3290    fn test_double_ended_iterator_owned() {
3291        let fields = vec![
3292            StructField::new("alpha", DataType::STRING, true),
3293            StructField::new("beta", DataType::INTEGER, false),
3294            StructField::new("gamma", DataType::BOOLEAN, true),
3295        ];
3296        let struct_type = StructType::new_unchecked(fields);
3297
3298        // Test iterating from both ends using owned iterator
3299        let mut iter = struct_type.into_iter();
3300
3301        // Backward iteration first
3302        assert_eq!(iter.next_back().unwrap().name, "gamma");
3303
3304        // Forward iteration
3305        assert_eq!(iter.next().unwrap().name, "alpha");
3306
3307        // Backward iteration again
3308        assert_eq!(iter.next_back().unwrap().name, "beta");
3309
3310        // Should be exhausted
3311        assert!(iter.next().is_none());
3312        assert!(iter.next_back().is_none());
3313    }
3314
3315    #[test]
3316    fn test_double_ended_iterator_collect_reverse() {
3317        let fields = vec![
3318            StructField::new("one", DataType::STRING, true),
3319            StructField::new("two", DataType::INTEGER, false),
3320            StructField::new("three", DataType::BOOLEAN, true),
3321        ];
3322        let struct_type = StructType::new_unchecked(fields);
3323
3324        // Test collecting in reverse order using DoubleEndedIterator
3325        let reversed_names: Vec<_> = struct_type.fields().rev().map(|f| f.name()).collect();
3326        assert_eq!(reversed_names, vec!["three", "two", "one"]);
3327
3328        // Test that we can still use the original struct
3329        assert_eq!(struct_type.field("two").unwrap().name, "two");
3330    }
3331
3332    #[test]
3333    fn test_double_ended_iterator_with_into_iter_ref() {
3334        let fields = vec![
3335            StructField::new("x", DataType::DOUBLE, true),
3336            StructField::new("y", DataType::FLOAT, false),
3337            StructField::new("z", DataType::LONG, true),
3338        ];
3339        let struct_type = StructType::new_unchecked(fields);
3340
3341        // Test DoubleEndedIterator with &StructType into_iter
3342        let mut iter = (&struct_type).into_iter();
3343
3344        // Mix forward and backward iteration
3345        assert_eq!(iter.next().unwrap().name, "x");
3346        assert_eq!(iter.next_back().unwrap().name, "z");
3347        assert_eq!(iter.next().unwrap().name, "y");
3348
3349        // Should be exhausted
3350        assert!(iter.next().is_none());
3351        assert!(iter.next_back().is_none());
3352    }
3353
3354    #[test]
3355    fn test_fused_iterator_ref() {
3356        let fields = vec![
3357            StructField::new("test1", DataType::STRING, true),
3358            StructField::new("test2", DataType::INTEGER, false),
3359        ];
3360        let struct_type = StructType::new_unchecked(fields);
3361
3362        // Verify that reference iterator implements FusedIterator
3363        let mut iter = struct_type.fields();
3364
3365        // Exhaust the iterator
3366        assert!(iter.next().is_some());
3367        assert!(iter.next().is_some());
3368        assert!(iter.next().is_none());
3369
3370        // FusedIterator guarantees that calling next() after exhaustion always returns None
3371        assert!(iter.next().is_none());
3372        assert!(iter.next().is_none());
3373        assert!(iter.next().is_none());
3374    }
3375
3376    #[test]
3377    fn test_fused_iterator_owned() {
3378        let fields = vec![
3379            StructField::new("item1", DataType::STRING, true),
3380            StructField::new("item2", DataType::INTEGER, false),
3381        ];
3382        let struct_type = StructType::new_unchecked(fields);
3383
3384        // Verify that owned iterator implements FusedIterator
3385        let mut iter = struct_type.into_iter();
3386
3387        // Exhaust the iterator
3388        assert!(iter.next().is_some());
3389        assert!(iter.next().is_some());
3390        assert!(iter.next().is_none());
3391
3392        // FusedIterator guarantees that calling next() after exhaustion always returns None
3393        assert!(iter.next().is_none());
3394        assert!(iter.next().is_none());
3395        assert!(iter.next().is_none());
3396    }
3397
3398    #[test]
3399    fn test_fused_iterator_with_into_iter_ref() {
3400        let fields = vec![StructField::new("field_a", DataType::BOOLEAN, true)];
3401        let struct_type = StructType::new_unchecked(fields);
3402
3403        // Verify that &StructType into_iter implements FusedIterator
3404        let mut iter = (&struct_type).into_iter();
3405
3406        // Exhaust the iterator
3407        assert!(iter.next().is_some());
3408        assert!(iter.next().is_none());
3409
3410        // FusedIterator guarantees that calling next() after exhaustion always returns None
3411        assert!(iter.next().is_none());
3412        assert!(iter.next().is_none());
3413    }
3414
3415    #[test]
3416    fn test_fused_double_ended_iterator_empty() {
3417        let struct_type = StructType::new_unchecked(std::iter::empty::<StructField>());
3418
3419        // Test both forward and backward iteration on empty iterator
3420        let mut iter = struct_type.fields();
3421
3422        // Empty iterator should return None immediately
3423        assert!(iter.next().is_none());
3424        assert!(iter.next_back().is_none());
3425
3426        // FusedIterator guarantees continued None after exhaustion
3427        assert!(iter.next().is_none());
3428        assert!(iter.next_back().is_none());
3429    }
3430
3431    #[test]
3432    fn test_double_ended_iterator_single_element() {
3433        let fields = vec![StructField::new("single", DataType::STRING, true)];
3434        let struct_type = StructType::new_unchecked(fields);
3435
3436        // Test DoubleEndedIterator with single element
3437        let mut iter = struct_type.fields();
3438
3439        // Should get the single element from next()
3440        assert_eq!(iter.next().unwrap().name, "single");
3441        assert!(iter.next().is_none());
3442        assert!(iter.next_back().is_none());
3443
3444        // Test getting single element from next_back()
3445        let struct_type =
3446            StructType::new_unchecked([StructField::new("single2", DataType::INTEGER, false)]);
3447        let mut iter = struct_type.into_iter();
3448
3449        assert_eq!(iter.next_back().unwrap().name, "single2");
3450        assert!(iter.next().is_none());
3451        assert!(iter.next_back().is_none());
3452    }
3453
3454    #[test]
3455    fn test_metadata_column_spec() -> DeltaResult<()> {
3456        // Test text_value
3457        assert_eq!(MetadataColumnSpec::RowIndex.text_value(), "row_index");
3458        assert_eq!(MetadataColumnSpec::RowId.text_value(), "row_id");
3459        assert_eq!(
3460            MetadataColumnSpec::RowCommitVersion.text_value(),
3461            "row_commit_version"
3462        );
3463        assert_eq!(MetadataColumnSpec::FilePath.text_value(), "_file");
3464
3465        // Test data_type
3466        assert_eq!(MetadataColumnSpec::RowIndex.data_type(), DataType::LONG);
3467        assert_eq!(MetadataColumnSpec::RowId.data_type(), DataType::LONG);
3468        assert_eq!(
3469            MetadataColumnSpec::RowCommitVersion.data_type(),
3470            DataType::LONG
3471        );
3472        assert_eq!(MetadataColumnSpec::FilePath.data_type(), DataType::STRING);
3473
3474        // Test nullable
3475        assert!(!MetadataColumnSpec::RowIndex.nullable());
3476        assert!(!MetadataColumnSpec::RowId.nullable());
3477        assert!(!MetadataColumnSpec::RowCommitVersion.nullable());
3478        assert!(!MetadataColumnSpec::FilePath.nullable());
3479
3480        // Test reserved_field_id
3481        assert_eq!(MetadataColumnSpec::RowIndex.reserved_field_id(), None);
3482        assert_eq!(MetadataColumnSpec::RowId.reserved_field_id(), None);
3483        assert_eq!(
3484            MetadataColumnSpec::RowCommitVersion.reserved_field_id(),
3485            None
3486        );
3487        assert_eq!(
3488            MetadataColumnSpec::FilePath.reserved_field_id(),
3489            Some(crate::reserved_field_ids::FILE_NAME)
3490        );
3491
3492        // Test from_str
3493        assert_eq!(
3494            MetadataColumnSpec::from_str("row_index")?,
3495            MetadataColumnSpec::RowIndex
3496        );
3497        assert_eq!(
3498            MetadataColumnSpec::from_str("row_id")?,
3499            MetadataColumnSpec::RowId
3500        );
3501        assert_eq!(
3502            MetadataColumnSpec::from_str("row_commit_version")?,
3503            MetadataColumnSpec::RowCommitVersion
3504        );
3505        assert_eq!(
3506            MetadataColumnSpec::from_str("_file")?,
3507            MetadataColumnSpec::FilePath
3508        );
3509
3510        // Test invalid from_str
3511        assert!(MetadataColumnSpec::from_str("invalid").is_err());
3512
3513        Ok(())
3514    }
3515
3516    #[test]
3517    fn test_create_metadata_column() {
3518        let field =
3519            StructField::create_metadata_column("test_row_index", MetadataColumnSpec::RowIndex);
3520
3521        assert_eq!(field.name(), "test_row_index");
3522        assert_eq!(field.data_type(), &DataType::LONG);
3523        assert!(!field.nullable);
3524        assert!(field.is_metadata_column());
3525        assert_eq!(
3526            field.get_metadata_column_spec(),
3527            Some(MetadataColumnSpec::RowIndex)
3528        );
3529    }
3530
3531    #[test]
3532    fn test_default_row_index_column() {
3533        let field = StructField::default_row_index_column();
3534
3535        assert_eq!(field.name(), "_metadata.row_index");
3536        assert_eq!(field.data_type(), &DataType::LONG);
3537        assert!(!field.nullable);
3538        assert!(field.is_metadata_column());
3539        assert_eq!(
3540            field.get_metadata_column_spec(),
3541            Some(MetadataColumnSpec::RowIndex)
3542        );
3543    }
3544
3545    #[test]
3546    fn test_add_column() -> DeltaResult<()> {
3547        let schema = StructType::try_new([StructField::nullable("col1", DataType::STRING)])?;
3548
3549        let new_field = StructField::nullable("col2", DataType::INTEGER);
3550        let updated_schema = schema.add([new_field])?;
3551
3552        assert_eq!(updated_schema.fields().count(), 2);
3553        assert!(updated_schema.contains("col1"));
3554        assert!(updated_schema.contains("col2"));
3555        Ok(())
3556    }
3557
3558    #[test]
3559    fn test_add_metadata_column() -> DeltaResult<()> {
3560        let schema = StructType::try_new([StructField::nullable("regular_col", DataType::STRING)])?;
3561
3562        let schema_with_metadata =
3563            schema.add_metadata_column("my_row_index", MetadataColumnSpec::RowIndex)?;
3564
3565        assert_eq!(schema_with_metadata.fields().count(), 2);
3566        assert!(schema_with_metadata.contains_metadata_column(&MetadataColumnSpec::RowIndex));
3567        assert!(schema_with_metadata.contains("my_row_index"));
3568        assert_eq!(
3569            schema_with_metadata.index_of_metadata_column(&MetadataColumnSpec::RowIndex),
3570            Some(&1)
3571        );
3572        Ok(())
3573    }
3574
3575    #[test]
3576    fn test_duplicate_metadata_columns() -> DeltaResult<()> {
3577        let schema = StructType::try_new([StructField::nullable("regular_col", DataType::STRING)])?;
3578
3579        let schema_with_metadata =
3580            schema.add_metadata_column("row_index1", MetadataColumnSpec::RowIndex)?;
3581
3582        // Adding another row index metadata column should fail
3583        let result =
3584            schema_with_metadata.add_metadata_column("row_index2", MetadataColumnSpec::RowIndex);
3585
3586        assert_result_error_with_message(result, "Duplicate metadata column");
3587        Ok(())
3588    }
3589
3590    #[test]
3591    fn test_duplicate_field_name_case_insensitive() {
3592        // Delta column names are case-insensitive per protocol; (Value, value) is invalid
3593        let result = StructType::try_new([
3594            StructField::nullable("Value", DataType::INTEGER),
3595            StructField::nullable("value", DataType::STRING),
3596        ]);
3597        assert_result_error_with_message(result, "Duplicate field name (case-insensitive)");
3598    }
3599
3600    #[test]
3601    fn test_duplicate_field_name_exact() {
3602        // Exact duplicate (same name twice) is rejected via the case-insensitive check
3603        let result = StructType::try_new([
3604            StructField::nullable("id", DataType::INTEGER),
3605            StructField::nullable("id", DataType::STRING),
3606        ]);
3607        assert_result_error_with_message(result, "Duplicate field name (case-insensitive)");
3608    }
3609
3610    #[test]
3611    fn test_nested_metadata_columns_validation_struct() -> DeltaResult<()> {
3612        // Test that metadata columns in nested structs are rejected
3613        let nested_field_with_metadata =
3614            StructField::create_metadata_column("nested_row_index", MetadataColumnSpec::RowIndex);
3615        let nested_struct = StructType {
3616            type_name: "struct".into(),
3617            fields: [(
3618                nested_field_with_metadata.name.clone(),
3619                nested_field_with_metadata,
3620            )]
3621            .into_iter()
3622            .collect(),
3623            metadata_columns: HashMap::new(),
3624        };
3625
3626        let result = StructType::try_new([
3627            StructField::nullable("regular_col", DataType::STRING),
3628            StructField::nullable("nested", DataType::Struct(Box::new(nested_struct))),
3629        ]);
3630
3631        assert_result_error_with_message(result, "only allowed at the top level");
3632        Ok(())
3633    }
3634
3635    #[test]
3636    fn test_nested_metadata_columns_validation_array() -> DeltaResult<()> {
3637        // Test that metadata columns in array element structs are rejected
3638        let nested_field_with_metadata =
3639            StructField::create_metadata_column("nested_row_index", MetadataColumnSpec::RowIndex);
3640        let nested_struct = StructType {
3641            type_name: "struct".into(),
3642            fields: [(
3643                nested_field_with_metadata.name.clone(),
3644                nested_field_with_metadata,
3645            )]
3646            .into_iter()
3647            .collect(),
3648            metadata_columns: HashMap::new(),
3649        };
3650        let array_type = ArrayType::new(DataType::Struct(Box::new(nested_struct)), true);
3651
3652        let result = StructType::try_new([
3653            StructField::nullable("regular_col", DataType::STRING),
3654            StructField::nullable("array_col", DataType::Array(Box::new(array_type))),
3655        ]);
3656
3657        assert_result_error_with_message(result, "only allowed at the top level");
3658        Ok(())
3659    }
3660
3661    #[test]
3662    fn test_nested_metadata_columns_validation_map() -> DeltaResult<()> {
3663        // Test that metadata columns in map key structs or map value structs are rejected
3664        let nested_field_with_metadata =
3665            StructField::create_metadata_column("nested_row_index", MetadataColumnSpec::RowIndex);
3666        let nested_struct = StructType {
3667            type_name: "struct".into(),
3668            fields: [(
3669                nested_field_with_metadata.name.clone(),
3670                nested_field_with_metadata,
3671            )]
3672            .into_iter()
3673            .collect(),
3674            metadata_columns: HashMap::new(),
3675        };
3676
3677        for map_type in [
3678            MapType::new(
3679                DataType::Struct(Box::new(nested_struct.clone())),
3680                DataType::STRING,
3681                true,
3682            ),
3683            MapType::new(
3684                DataType::STRING,
3685                DataType::Struct(Box::new(nested_struct)),
3686                true,
3687            ),
3688        ] {
3689            let result = StructType::try_new([
3690                StructField::nullable("regular_col", DataType::STRING),
3691                StructField::nullable("map_col", DataType::Map(Box::new(map_type))),
3692            ]);
3693
3694            assert_result_error_with_message(result, "only allowed at the top level");
3695        }
3696
3697        Ok(())
3698    }
3699
3700    #[test]
3701    fn test_column_identifier_trait() -> DeltaResult<()> {
3702        let schema = StructType::try_new([
3703            StructField::nullable("regular_col", DataType::STRING),
3704            StructField::create_metadata_column("row_index_col", MetadataColumnSpec::RowIndex),
3705        ])?;
3706
3707        // Test string identifier
3708        assert!(schema.contains("regular_col"));
3709        assert!(schema.contains("row_index_col"));
3710        assert!(!schema.contains("nonexistent"));
3711
3712        // Test String identifier
3713        assert!(schema.contains("regular_col"));
3714        assert!(schema.contains("row_index_col"));
3715
3716        // Test MetadataColumnSpec identifier
3717        assert!(schema.contains_metadata_column(&MetadataColumnSpec::RowIndex));
3718        assert!(!schema.contains_metadata_column(&MetadataColumnSpec::RowId));
3719        Ok(())
3720    }
3721
3722    #[test]
3723    fn test_metadata_column_serialization() -> DeltaResult<()> {
3724        let field = StructField::create_metadata_column("test_row_id", MetadataColumnSpec::RowId);
3725
3726        // Test that serialization works
3727        let json = serde_json::to_string(&field)?;
3728        let deserialized: StructField = serde_json::from_str(&json)?;
3729
3730        assert_eq!(deserialized.name(), field.name());
3731        assert_eq!(deserialized.data_type(), field.data_type());
3732        assert_eq!(deserialized.nullable, field.nullable);
3733        assert!(deserialized.is_metadata_column());
3734        assert_eq!(
3735            deserialized.get_metadata_column_spec(),
3736            Some(MetadataColumnSpec::RowId)
3737        );
3738        Ok(())
3739    }
3740
3741    #[test]
3742    fn test_all_metadata_column_specs() -> DeltaResult<()> {
3743        let schema = StructType::try_new([StructField::nullable("regular_col", DataType::STRING)])?;
3744
3745        let schema = schema
3746            .add_metadata_column("row_index", MetadataColumnSpec::RowIndex)?
3747            .add_metadata_column("row_id", MetadataColumnSpec::RowId)?
3748            .add_metadata_column("row_commit_version", MetadataColumnSpec::RowCommitVersion)?;
3749
3750        assert_eq!(schema.fields().count(), 4);
3751        assert!(schema.contains_metadata_column(&MetadataColumnSpec::RowIndex));
3752        assert!(schema.contains_metadata_column(&MetadataColumnSpec::RowId));
3753        assert!(schema.contains_metadata_column(&MetadataColumnSpec::RowCommitVersion));
3754
3755        assert_eq!(
3756            schema.index_of_metadata_column(&MetadataColumnSpec::RowIndex),
3757            Some(&1)
3758        );
3759        assert_eq!(
3760            schema.index_of_metadata_column(&MetadataColumnSpec::RowId),
3761            Some(&2)
3762        );
3763        assert_eq!(
3764            schema.index_of_metadata_column(&MetadataColumnSpec::RowCommitVersion),
3765            Some(&3)
3766        );
3767        Ok(())
3768    }
3769
3770    #[test]
3771    fn test_physical_name_with_mode_none() {
3772        let field_json = r#"{
3773            "name": "logical_name",
3774            "type": "string",
3775            "nullable": true,
3776            "metadata": {
3777                "delta.columnMapping.physicalName": "physical_name_col123"
3778            }
3779        }"#;
3780        let field: StructField = serde_json::from_str(field_json).unwrap();
3781
3782        // With ColumnMappingMode::None, should return logical name even though physical name exists
3783        assert_eq!(field.physical_name(ColumnMappingMode::None), "logical_name");
3784    }
3785
3786    #[test]
3787    fn test_physical_name_with_mode_id() {
3788        let field_json = r#"{
3789            "name": "logical_name",
3790            "type": "string",
3791            "nullable": true,
3792            "metadata": {
3793                "delta.columnMapping.id": 5,
3794                "delta.columnMapping.physicalName": "physical_name_col123"
3795            }
3796        }"#;
3797        let field: StructField = serde_json::from_str(field_json).unwrap();
3798
3799        // With ColumnMappingMode::Id, should return physical name
3800        assert_eq!(
3801            field.physical_name(ColumnMappingMode::Id),
3802            "physical_name_col123"
3803        );
3804    }
3805
3806    #[test]
3807    fn test_physical_name_with_mode_name() {
3808        let field_json = r#"{
3809            "name": "logical_name",
3810            "type": "string",
3811            "nullable": true,
3812            "metadata": {
3813                "delta.columnMapping.physicalName": "physical_name_col456"
3814            }
3815        }"#;
3816        let field: StructField = serde_json::from_str(field_json).unwrap();
3817
3818        // With ColumnMappingMode::Name, should return physical name
3819        assert_eq!(
3820            field.physical_name(ColumnMappingMode::Name),
3821            "physical_name_col456"
3822        );
3823    }
3824
3825    #[test]
3826    fn test_physical_name_fallback_id() {
3827        let field_json = r#"{
3828            "name": "logical_name",
3829            "type": "string",
3830            "nullable": true,
3831            "metadata": {}
3832        }"#;
3833        let field: StructField = serde_json::from_str(field_json).unwrap();
3834
3835        // With ColumnMappingMode::Id but no physical name, should fallback to logical name
3836        assert_eq!(field.physical_name(ColumnMappingMode::Id), "logical_name");
3837    }
3838
3839    #[test]
3840    fn test_physical_name_fallback_name() {
3841        let field_json = r#"{
3842            "name": "logical_name",
3843            "type": "string",
3844            "nullable": true,
3845            "metadata": {}
3846        }"#;
3847        let field: StructField = serde_json::from_str(field_json).unwrap();
3848
3849        // With ColumnMappingMode::Name but no physical name, should fallback to logical name
3850        assert_eq!(field.physical_name(ColumnMappingMode::Name), "logical_name");
3851    }
3852
3853    #[test]
3854    fn test_display_struct_type_stable_output() -> DeltaResult<()> {
3855        let nested_field_with_metadata =
3856            StructField::create_metadata_column("nested_row_index", MetadataColumnSpec::RowIndex);
3857        let inner_struct =
3858            StructType::new_unchecked([StructField::new("q", DataType::LONG, false)]);
3859        let nested_struct = StructType::new_unchecked([
3860            nested_field_with_metadata,
3861            StructField::new("x", DataType::DOUBLE, true),
3862            StructField::new(
3863                "inner_struct",
3864                DataType::Struct(Box::new(inner_struct)),
3865                false,
3866            ),
3867        ]);
3868        let array_type = ArrayType::new(DataType::Struct(Box::new(nested_struct.clone())), true);
3869        let map_type = MapType::new(
3870            DataType::Struct(Box::new(nested_struct.clone())),
3871            DataType::Struct(Box::new(nested_struct.clone())), // kek
3872            true,
3873        );
3874        let fields = vec![
3875            StructField::new("x", DataType::DOUBLE, true),
3876            StructField::new("y", DataType::FLOAT, false),
3877            StructField::new("z", DataType::LONG, true),
3878            StructField::new("s", nested_struct.clone(), false),
3879            StructField::nullable("array_col", DataType::Array(Box::new(array_type))),
3880            StructField::nullable("map_col", DataType::Map(Box::new(map_type))),
3881            StructField::new("a", DataType::LONG, true),
3882        ];
3883
3884        let struct_type = StructType::new_unchecked(fields);
3885        assert_eq!(
3886            struct_type.to_string(),
3887            "struct:
3888├─x: double (is nullable: true, metadata: {})
3889├─y: float (is nullable: false, metadata: {})
3890├─z: long (is nullable: true, metadata: {})
3891├─s: struct<nested_row_index: long, x: double, inner_struct: struct<q: long>> (is nullable: false, metadata: {})
3892│  ├─nested_row_index: long (is nullable: false, metadata: {delta.metadataSpec: String(\"row_index\")})
3893│  ├─x: double (is nullable: true, metadata: {})
3894│  └─inner_struct: struct<q: long> (is nullable: false, metadata: {})
3895│     └─q: long (is nullable: false, metadata: {})
3896├─array_col: array<struct<nested_row_index: long, x: double, inner_struct: struct<q: long>>> (is nullable: true, metadata: {})
3897│  └─array_element: struct<nested_row_index: long, x: double, inner_struct: struct<q: long>>
3898│     ├─nested_row_index: long (is nullable: false, metadata: {delta.metadataSpec: String(\"row_index\")})
3899│     ├─x: double (is nullable: true, metadata: {})
3900│     └─inner_struct: struct<q: long> (is nullable: false, metadata: {})
3901│        └─q: long (is nullable: false, metadata: {})
3902├─map_col: map<struct<nested_row_index: long, x: double, inner_struct: struct<q: long>>, struct<nested_row_index: long, x: double, inner_struct: struct<q: long>>> (is nullable: true, metadata: {})
3903│  ├─map_key: struct<nested_row_index: long, x: double, inner_struct: struct<q: long>>
3904│  │  ├─nested_row_index: long (is nullable: false, metadata: {delta.metadataSpec: String(\"row_index\")})
3905│  │  ├─x: double (is nullable: true, metadata: {})
3906│  │  └─inner_struct: struct<q: long> (is nullable: false, metadata: {})
3907│  │     └─q: long (is nullable: false, metadata: {})
3908│  └─map_value: struct<nested_row_index: long, x: double, inner_struct: struct<q: long>>
3909│     ├─nested_row_index: long (is nullable: false, metadata: {delta.metadataSpec: String(\"row_index\")})
3910│     ├─x: double (is nullable: true, metadata: {})
3911│     └─inner_struct: struct<q: long> (is nullable: false, metadata: {})
3912│        └─q: long (is nullable: false, metadata: {})
3913└─a: long (is nullable: true, metadata: {})
3914"
3915        );
3916
3917        let schema = StructType::try_new([StructField::nullable("regular_col", DataType::STRING)])?;
3918        let schema = schema
3919            .add_metadata_column("row_index", MetadataColumnSpec::RowIndex)?
3920            .add_metadata_column("row_id", MetadataColumnSpec::RowId)?
3921            .add_metadata_column("row_commit_version", MetadataColumnSpec::RowCommitVersion)?;
3922        assert_eq!(schema.to_string(), "struct:
3923├─regular_col: string (is nullable: true, metadata: {})
3924├─row_index: long (is nullable: false, metadata: {delta.metadataSpec: String(\"row_index\")})
3925├─row_id: long (is nullable: false, metadata: {delta.metadataSpec: String(\"row_id\")})
3926└─row_commit_version: long (is nullable: false, metadata: {delta.metadataSpec: String(\"row_commit_version\")})
3927");
3928        Ok(())
3929    }
3930
3931    #[test]
3932    fn test_builder_empty() {
3933        let schema = StructType::builder().build().unwrap();
3934        assert_eq!(schema.num_fields(), 0)
3935    }
3936
3937    #[test]
3938    fn test_builder_add_fields() {
3939        let schema = StructType::builder()
3940            .add_field(StructField::new("id", DataType::INTEGER, false))
3941            .add_field(StructField::new("name", DataType::STRING, true))
3942            .build()
3943            .unwrap();
3944
3945        assert_eq!(schema.num_fields(), 2);
3946        assert_eq!(schema.field_at_index(0).unwrap().name(), "id");
3947        assert_eq!(schema.field_at_index(1).unwrap().name(), "name");
3948    }
3949
3950    #[test]
3951    fn test_builder_from_schema() {
3952        let base_schema =
3953            StructType::try_new([StructField::new("id", DataType::INTEGER, false)]).unwrap();
3954
3955        let extended_schema = StructTypeBuilder::from_schema(&base_schema)
3956            .add_field(StructField::new("name", DataType::STRING, true))
3957            .build()
3958            .unwrap();
3959
3960        assert_eq!(extended_schema.num_fields(), 2);
3961        assert_eq!(extended_schema.field_at_index(0).unwrap().name(), "id");
3962        assert_eq!(extended_schema.field_at_index(1).unwrap().name(), "name");
3963    }
3964
3965    #[test]
3966    fn test_parquet_field_id_key_value() {
3967        // Verify the string value of ColumnMetadataKey::ParquetFieldId matches the convention
3968        // used by delta-spark and other Delta ecosystem implementations. This is not part of
3969        // the Delta protocol spec, so we pin the value here to catch accidental changes.
3970        assert_eq!(
3971            ColumnMetadataKey::ParquetFieldId.as_ref(),
3972            "parquet.field.id"
3973        );
3974    }
3975
3976    #[test]
3977    fn test_with_field_inserted_empty_struct() {
3978        let schema = StructType::try_new([]).unwrap();
3979        let schema = schema
3980            .with_field_inserted_after(None, StructField::new("age", DataType::STRING, true))
3981            .expect("with field inserted should produce a valid schema");
3982        assert_eq!(schema.num_fields(), 1);
3983        assert_eq!(schema.field_at_index(0).unwrap().name(), "age");
3984    }
3985
3986    #[test]
3987    fn test_with_field_inserted() {
3988        let schema = StructType::try_new([
3989            StructField::new("id", DataType::INTEGER, false),
3990            StructField::new("name", DataType::STRING, true),
3991        ])
3992        .unwrap();
3993        let schema = schema
3994            .with_field_inserted_after(Some("id"), StructField::new("age", DataType::STRING, true))
3995            .expect("with field inserted should produce a valid schema");
3996        assert_eq!(schema.num_fields(), 3);
3997        assert_eq!(schema.field_at_index(0).unwrap().name(), "id");
3998        assert_eq!(schema.field_at_index(1).unwrap().name(), "age");
3999        assert_eq!(schema.field_at_index(2).unwrap().name(), "name");
4000    }
4001
4002    #[test]
4003    fn test_with_field_inserted_append_to_end() {
4004        let schema = StructType::try_new([
4005            StructField::new("id", DataType::INTEGER, false),
4006            StructField::new("name", DataType::STRING, true),
4007        ])
4008        .unwrap();
4009        let schema = schema
4010            .with_field_inserted_after(None, StructField::new("age", DataType::STRING, true))
4011            .expect("with field inserted should produce a valid schema");
4012
4013        assert_eq!(schema.num_fields(), 3);
4014        assert_eq!(schema.field_at_index(0).unwrap().name(), "id");
4015        assert_eq!(schema.field_at_index(1).unwrap().name(), "name");
4016        assert_eq!(schema.field_at_index(2).unwrap().name(), "age");
4017    }
4018
4019    #[test]
4020    fn test_with_field_inserted_after_non_existent_field() {
4021        let schema =
4022            StructType::try_new([StructField::new("id", DataType::INTEGER, false)]).unwrap();
4023        let new_schema = schema.with_field_inserted_after(
4024            Some("nonexistent"),
4025            StructField::new("name", DataType::STRING, true),
4026        );
4027        assert!(new_schema.is_err());
4028    }
4029
4030    #[test]
4031    fn test_with_field_inserted_after_duplicate_field() {
4032        let schema = StructType::try_new([
4033            StructField::new("id", DataType::INTEGER, false),
4034            StructField::new("name", DataType::STRING, true),
4035        ])
4036        .unwrap();
4037        let new_schema = schema.with_field_inserted_after(
4038            Some("name"),
4039            StructField::new("id", DataType::STRING, true),
4040        );
4041        assert!(new_schema.is_err());
4042        assert_result_error_with_message(new_schema, "Field id already exists");
4043    }
4044
4045    #[test]
4046    fn test_with_field_inserted_before() {
4047        let schema = StructType::try_new([
4048            StructField::new("id", DataType::INTEGER, false),
4049            StructField::new("name", DataType::STRING, true),
4050        ])
4051        .unwrap();
4052        let schema = schema
4053            .with_field_inserted_before(
4054                Some("name"),
4055                StructField::new("age", DataType::STRING, true),
4056            )
4057            .expect("with field inserted before should produce a valid schema");
4058        assert_eq!(schema.num_fields(), 3);
4059        assert_eq!(schema.field_at_index(0).unwrap().name(), "id");
4060        assert_eq!(schema.field_at_index(1).unwrap().name(), "age");
4061        assert_eq!(schema.field_at_index(2).unwrap().name(), "name");
4062    }
4063
4064    #[test]
4065    fn test_with_field_inserted_before_duplicate_field() {
4066        let schema = StructType::try_new([
4067            StructField::new("id", DataType::INTEGER, false),
4068            StructField::new("name", DataType::STRING, true),
4069        ])
4070        .unwrap();
4071        let new_schema = schema.with_field_inserted_before(
4072            Some("name"),
4073            StructField::new("id", DataType::STRING, true),
4074        );
4075        assert!(new_schema.is_err());
4076        assert_result_error_with_message(new_schema, "Field id already exists");
4077    }
4078
4079    #[test]
4080    fn test_with_field_inserted_before_at_beginning() {
4081        let schema = StructType::try_new([
4082            StructField::new("id", DataType::INTEGER, false),
4083            StructField::new("name", DataType::STRING, true),
4084        ])
4085        .unwrap();
4086        let schema = schema
4087            .with_field_inserted_before(None, StructField::new("age", DataType::STRING, true))
4088            .expect("with field inserted before should produce a valid schema");
4089        assert_eq!(schema.num_fields(), 3);
4090        assert_eq!(schema.field_at_index(0).unwrap().name(), "age");
4091        assert_eq!(schema.field_at_index(1).unwrap().name(), "id");
4092        assert_eq!(schema.field_at_index(2).unwrap().name(), "name");
4093    }
4094
4095    #[test]
4096    fn test_with_field_inserted_before_non_existent_field() {
4097        let schema =
4098            StructType::try_new([StructField::new("id", DataType::INTEGER, false)]).unwrap();
4099        let new_schema = schema.with_field_inserted_before(
4100            Some("nonexistent"),
4101            StructField::new("name", DataType::STRING, true),
4102        );
4103        assert!(new_schema.is_err());
4104    }
4105
4106    #[test]
4107    fn test_with_field_inserted_before_empty_struct() {
4108        let schema = StructType::try_new([]).unwrap();
4109        let schema = schema
4110            .with_field_inserted_before(None, StructField::new("age", DataType::STRING, true))
4111            .expect("with field inserted before on empty struct should succeed");
4112        assert_eq!(schema.num_fields(), 1);
4113        assert_eq!(schema.field_at_index(0).unwrap().name(), "age");
4114    }
4115
4116    #[test]
4117    fn test_with_field_removed() {
4118        let schema =
4119            StructType::try_new([StructField::new("id", DataType::INTEGER, false)]).unwrap();
4120        let new_schema = schema.with_field_removed("id");
4121        assert_eq!(new_schema.num_fields(), 0);
4122    }
4123
4124    #[test]
4125    fn test_with_field_removed_non_existent_field() {
4126        let schema =
4127            StructType::try_new([StructField::new("id", DataType::INTEGER, false)]).unwrap();
4128        let new_schema = schema.with_field_removed("nonexistent");
4129        assert_eq!(new_schema.num_fields(), 1);
4130        assert_eq!(new_schema.field_at_index(0).unwrap().name(), "id");
4131    }
4132
4133    #[test]
4134    fn test_with_field_replaced() {
4135        let schema =
4136            StructType::try_new([StructField::new("id", DataType::INTEGER, false)]).unwrap();
4137        let new_schema = schema
4138            .with_field_replaced("id", StructField::new("name", DataType::STRING, true))
4139            .unwrap();
4140
4141        assert_eq!(new_schema.num_fields(), 1);
4142        assert_eq!(new_schema.field_at_index(0).unwrap().name(), "name");
4143    }
4144
4145    #[test]
4146    fn test_with_field_replaced_non_existent_field() {
4147        let schema =
4148            StructType::try_new([StructField::new("id", DataType::INTEGER, false)]).unwrap();
4149        let new_schema = schema.with_field_replaced(
4150            "nonexistent",
4151            StructField::new("name", DataType::STRING, true),
4152        );
4153        assert!(new_schema.is_err(), "Expected error for non-existent field");
4154    }
4155
4156    /// Schema: { a: { b: { c: double } } } — supports walks at depths 1, 2, and 3.
4157    fn walk_test_schema() -> StructType {
4158        let l3 = StructType::new_unchecked([StructField::new("c", DataType::DOUBLE, false)]);
4159        let l2 = StructType::new_unchecked([StructField::new(
4160            "b",
4161            DataType::Struct(Box::new(l3)),
4162            false,
4163        )]);
4164        StructType::new_unchecked([StructField::new("a", DataType::Struct(Box::new(l2)), false)])
4165    }
4166
4167    #[rstest::rstest]
4168    #[case::single_level(vec!["a"], vec!["a"], DataType::Struct(Box::new(
4169        StructType::new_unchecked([StructField::new("b", DataType::Struct(Box::new(
4170            StructType::new_unchecked([StructField::new("c", DataType::DOUBLE, false)])
4171        )), false)])
4172    )))]
4173    #[case::nested_2(vec!["a", "b"], vec!["a", "b"], DataType::Struct(Box::new(
4174        StructType::new_unchecked([StructField::new("c", DataType::DOUBLE, false)])
4175    )))]
4176    #[case::nested_3(vec!["a", "b", "c"], vec!["a", "b", "c"], DataType::DOUBLE)]
4177    #[test]
4178    fn test_walk_column_fields_happy(
4179        #[case] col_path: Vec<&str>,
4180        #[case] expected_names: Vec<&str>,
4181        #[case] expected_leaf_type: DataType,
4182    ) {
4183        let schema = walk_test_schema();
4184        let fields = schema
4185            .walk_column_fields(&ColumnName::new(col_path.iter().copied()))
4186            .unwrap();
4187        assert_eq!(fields.len(), expected_names.len());
4188        for (field, name) in fields.iter().zip(expected_names.iter()) {
4189            assert_eq!(field.name(), *name);
4190        }
4191        assert_eq!(fields.last().unwrap().data_type(), &expected_leaf_type);
4192    }
4193
4194    #[rstest::rstest]
4195    #[case::empty_path(vec![], "Column path cannot be empty")]
4196    #[case::not_found_top(vec!["x"], "not found in schema")]
4197    #[case::not_found_nested(vec!["a", "x"], "not found in schema")]
4198    #[case::intermediate_not_struct(vec!["a", "b", "c", "d"], "not a struct type")]
4199    #[test]
4200    fn test_walk_column_fields_error(#[case] col_path: Vec<&str>, #[case] expected_error: &str) {
4201        let schema = walk_test_schema();
4202        let result = schema.walk_column_fields(&ColumnName::new(col_path.iter().copied()));
4203        assert_result_error_with_message(result, expected_error);
4204    }
4205
4206    #[test]
4207    fn test_normalize_column_names_to_schema_casing() {
4208        let inner =
4209            StructType::new_unchecked(vec![StructField::new("City", DataType::STRING, false)]);
4210        let schema = StructType::new_unchecked(vec![
4211            StructField::new("id", DataType::INTEGER, false),
4212            StructField::new("EventDate", DataType::DATE, false),
4213            StructField::new("Address", DataType::Struct(Box::new(inner)), false),
4214        ]);
4215
4216        // Mismatched casing -> normalized to schema
4217        let cols = vec![ColumnName::new(["eventdate"])];
4218        assert_eq!(
4219            normalize_column_names_to_schema_casing(&schema, &cols)[0].path(),
4220            ["EventDate"]
4221        );
4222
4223        // Nested path -> each field name normalized
4224        let cols = vec![ColumnName::new(["address", "city"])];
4225        assert_eq!(
4226            normalize_column_names_to_schema_casing(&schema, &cols)[0].path(),
4227            ["Address", "City"]
4228        );
4229
4230        // Already matching -> unchanged
4231        let cols = vec![ColumnName::new(["id"])];
4232        assert_eq!(
4233            normalize_column_names_to_schema_casing(&schema, &cols)[0].path(),
4234            ["id"]
4235        );
4236
4237        // Unrecognized -> keeps original
4238        let cols = vec![ColumnName::new(["nonexistent"])];
4239        assert_eq!(
4240            normalize_column_names_to_schema_casing(&schema, &cols)[0].path(),
4241            ["nonexistent"]
4242        );
4243    }
4244}