Skip to main content

buoyant_kernel/schema/
mod.rs

1//! Definitions and functions to create and manipulate kernel schema
2
3use std::borrow::Cow;
4use std::collections::{HashMap, HashSet};
5use std::fmt::{Debug, Display, Formatter};
6use std::iter::{DoubleEndedIterator, FusedIterator};
7use std::str::FromStr;
8use std::sync::{Arc, LazyLock};
9
10use delta_kernel_derive::internal_api;
11use indexmap::IndexMap;
12use itertools::Itertools;
13use serde::{Deserialize, Serialize};
14use tracing::warn;
15
16// re-export because many call sites that use schemas do not necessarily use expressions
17pub(crate) use crate::expressions::{column_name, ColumnName};
18use crate::reserved_field_ids::FILE_NAME;
19use crate::table_features::{
20    validate_and_extract_column_mapping_annotations, validate_column_mapping_id, ColumnMappingMode,
21};
22use crate::transforms::{transform_output_type, SchemaTransform};
23use crate::utils::require;
24use crate::{DeltaResult, Error};
25
26pub(crate) mod compare;
27#[cfg(feature = "schema-diff")]
28pub(crate) mod diff;
29
30#[cfg(feature = "internal-api")]
31pub mod derive_macro_utils;
32#[cfg(not(feature = "internal-api"))]
33pub(crate) mod derive_macro_utils;
34pub(crate) mod validation;
35pub(crate) mod variant_utils;
36
37pub type Schema = StructType;
38pub type SchemaRef = Arc<StructType>;
39
40/// Converts a type to a [`Schema`] that represents that type. Derivable for struct types using the
41/// [`delta_kernel_derive::ToSchema`] derive macro.
42#[internal_api]
43pub(crate) trait ToSchema {
44    fn to_schema() -> StructType;
45}
46
47#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Eq)]
48#[serde(untagged)]
49pub enum MetadataValue {
50    Number(i64),
51    String(String),
52    Boolean(bool),
53    // The [PROTOCOL](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#struct-field) states
54    // only that the metadata is "A JSON map containing information about this column.", so we can
55    // actually have any valid json here. `Other` is therefore a catchall for things we don't need
56    // to handle.
57    Other(serde_json::Value),
58}
59
60impl Display for MetadataValue {
61    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
62        match self {
63            MetadataValue::Number(n) => write!(f, "{n}"),
64            MetadataValue::String(s) => write!(f, "{s}"),
65            MetadataValue::Boolean(b) => write!(f, "{b}"),
66            MetadataValue::Other(v) => write!(f, "{v}"), // just write the json back
67        }
68    }
69}
70
71impl From<String> for MetadataValue {
72    fn from(value: String) -> Self {
73        Self::String(value)
74    }
75}
76
77impl From<&String> for MetadataValue {
78    fn from(value: &String) -> Self {
79        Self::String(value.clone())
80    }
81}
82
83impl From<&str> for MetadataValue {
84    fn from(value: &str) -> Self {
85        Self::String(value.to_string())
86    }
87}
88
89impl From<i64> for MetadataValue {
90    fn from(value: i64) -> Self {
91        Self::Number(value)
92    }
93}
94
95impl From<bool> for MetadataValue {
96    fn from(value: bool) -> Self {
97        Self::Boolean(value)
98    }
99}
100
101#[derive(Debug)]
102pub enum ColumnMetadataKey {
103    ColumnMappingId,
104    ColumnMappingPhysicalName,
105    /// Parquet field IDs for the synthesized `element` / `key` / `value` fields of an Array or
106    /// Map. Stored on the *nearest ancestor* StructField as a JSON object whose keys are
107    /// dot-paths rooted at that field's name.
108    ///
109    /// # Example: list-in-map
110    ///
111    /// For `m: map<int, array<int>>` and the key/value/element fields having field ids
112    /// 100/101/102, the metadata on `m` should be:
113    ///
114    /// ```json
115    /// {
116    ///   "delta.columnMapping.nested.ids": {
117    ///     "m.key":           100,
118    ///     "m.value":         101,
119    ///     "m.value.element": 102
120    ///   }
121    /// }
122    /// ```
123    ColumnMappingNestedIds,
124    ParquetFieldId,
125    ParquetFieldNestedIds,
126    GenerationExpression,
127    IdentityStart,
128    IdentityStep,
129    IdentityHighWaterMark,
130    IdentityAllowExplicitInsert,
131    InternalColumn,
132    Invariants,
133    MetadataSpec,
134}
135
136impl AsRef<str> for ColumnMetadataKey {
137    fn as_ref(&self) -> &str {
138        match self {
139            Self::ColumnMappingId => "delta.columnMapping.id",
140            Self::ColumnMappingPhysicalName => "delta.columnMapping.physicalName",
141            Self::ColumnMappingNestedIds => "delta.columnMapping.nested.ids",
142            // "parquet.field.id" is not defined by the Delta protocol, but follows the convention
143            // established by delta-spark and other Delta ecosystem implementations for storing
144            // Parquet field IDs in StructField metadata.
145            Self::ParquetFieldId => "parquet.field.id",
146            // The Delta protocol defines this key for IcebergCompatV2/V3 nested field ids. It is
147            // legacy and will be replaced by `delta.columnMapping.nested.ids` (which kernel
148            // uses everywhere). Kept here for protocol compatibility only.
149            // Tracking issue: <https://github.com/delta-io/delta/issues/6688>
150            Self::ParquetFieldNestedIds => "parquet.field.nested.ids",
151            Self::GenerationExpression => "delta.generationExpression",
152            Self::IdentityAllowExplicitInsert => "delta.identity.allowExplicitInsert",
153            Self::IdentityHighWaterMark => "delta.identity.highWaterMark",
154            Self::IdentityStart => "delta.identity.start",
155            Self::IdentityStep => "delta.identity.step",
156            Self::InternalColumn => "delta.isInternalColumn",
157            Self::Invariants => "delta.invariants",
158            Self::MetadataSpec => "delta.metadataSpec",
159        }
160    }
161}
162
163/// Enumeration of metadata columns recognized by Delta Kernel.
164///
165/// Metadata columns provide additional information about rows in a Delta table.
166#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
167pub enum MetadataColumnSpec {
168    RowIndex,
169    RowId,
170    RowCommitVersion,
171    FilePath,
172}
173
174impl MetadataColumnSpec {
175    /// A human-readable name for the specified metadata column.
176    pub fn text_value(&self) -> &'static str {
177        match self {
178            Self::RowIndex => "row_index",
179            Self::RowId => "row_id",
180            Self::RowCommitVersion => "row_commit_version",
181            Self::FilePath => "_file",
182        }
183    }
184
185    /// The data type of the specified metadata column.
186    pub fn data_type(&self) -> DataType {
187        match self {
188            Self::RowIndex => DataType::LONG,
189            Self::RowId => DataType::LONG,
190            Self::RowCommitVersion => DataType::LONG,
191            Self::FilePath => DataType::STRING,
192        }
193    }
194
195    /// Whether the specified metadata column is nullable.
196    pub fn nullable(&self) -> bool {
197        match self {
198            Self::RowIndex => false,
199            Self::RowId => false,
200            Self::RowCommitVersion => false,
201            Self::FilePath => false,
202        }
203    }
204
205    /// The reserved field ID for the specified metadata column, if any.
206    pub fn reserved_field_id(&self) -> Option<i64> {
207        match self {
208            Self::FilePath => Some(FILE_NAME),
209            _ => None,
210        }
211    }
212}
213
214impl FromStr for MetadataColumnSpec {
215    type Err = Error;
216
217    fn from_str(s: &str) -> Result<Self, Self::Err> {
218        match s {
219            "row_index" => Ok(Self::RowIndex),
220            "row_id" => Ok(Self::RowId),
221            "row_commit_version" => Ok(Self::RowCommitVersion),
222            "_file" => Ok(Self::FilePath),
223            _ => Err(Error::Schema(format!("Unknown metadata column spec: {s}"))),
224        }
225    }
226}
227
228#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Eq)]
229pub struct StructField {
230    /// Name of this (possibly nested) column
231    pub name: String,
232    /// The data type of this field
233    #[serde(rename = "type")]
234    pub data_type: DataType,
235    /// Denotes whether this Field can be null
236    pub nullable: bool,
237    /// A JSON map containing information about this column
238    pub metadata: HashMap<String, MetadataValue>,
239}
240
241/// Parsed (and validated) pre-existing column-mapping annotations on a single field, as
242/// returned by [`StructField::validate_and_extract_existing_column_mapping_annotations`].
243/// Either, both, or neither of the two fields may be present; a field with neither set has no
244/// pre-populated column-mapping metadata.
245#[derive(Debug, Clone, Copy)]
246pub(crate) struct ExistingColumnMappingAnnotations<'a> {
247    /// Parsed `delta.columnMapping.id`, if present. Guaranteed non-negative.
248    pub id: Option<i64>,
249    /// Borrowed `delta.columnMapping.physicalName`, if present. Guaranteed non-empty.
250    pub physical_name: Option<&'a str>,
251}
252
253impl StructField {
254    /// The name of the default row index metadata column.
255    ///
256    /// Note that the dot does not indicate a nested field, it is just a separator for the metadata
257    /// column name.
258    const DEFAULT_ROW_INDEX_COLUMN_NAME: &'static str = "_metadata.row_index";
259
260    /////////////////
261    // Static methods
262    /////////////////
263
264    /// Creates a new field
265    pub fn new(name: impl Into<String>, data_type: impl Into<DataType>, nullable: bool) -> Self {
266        Self {
267            name: name.into(),
268            data_type: data_type.into(),
269            nullable,
270            metadata: HashMap::default(),
271        }
272    }
273
274    /// Creates a new nullable field
275    pub fn nullable(name: impl Into<String>, data_type: impl Into<DataType>) -> Self {
276        Self::new(name, data_type, true)
277    }
278
279    /// Creates a new non-nullable field
280    pub fn not_null(name: impl Into<String>, data_type: impl Into<DataType>) -> Self {
281        Self::new(name, data_type, false)
282    }
283
284    /// Creates a metadata column of the given spec with the given name.
285    pub fn create_metadata_column(name: impl Into<String>, spec: MetadataColumnSpec) -> Self {
286        let mut metadata = HashMap::new();
287        metadata.insert(
288            ColumnMetadataKey::MetadataSpec.as_ref().to_string(),
289            MetadataValue::String(spec.text_value().to_string()),
290        );
291
292        Self {
293            name: name.into(),
294            data_type: spec.data_type(),
295            nullable: spec.nullable(),
296            metadata,
297        }
298    }
299
300    /// Returns the default row index metadata column used by Kernel.
301    pub fn default_row_index_column() -> &'static StructField {
302        static DEFAULT_ROW_INDEX_COLUMN: LazyLock<StructField> = LazyLock::new(|| {
303            StructField::create_metadata_column(
304                StructField::DEFAULT_ROW_INDEX_COLUMN_NAME,
305                MetadataColumnSpec::RowIndex,
306            )
307        });
308        &DEFAULT_ROW_INDEX_COLUMN
309    }
310
311    ///////////////////
312    // Instance methods
313    ///////////////////
314
315    /// Replaces `self.metadata` with the list of <key, value> pairs in `metadata`.
316    pub fn with_metadata(
317        mut self,
318        metadata: impl IntoIterator<Item = (impl Into<String>, impl Into<MetadataValue>)>,
319    ) -> Self {
320        self.metadata = metadata
321            .into_iter()
322            .map(|(k, v)| (k.into(), v.into()))
323            .collect();
324        self
325    }
326
327    /// Extends `self.metadata` to include the <key, value> pairs in `metadata`.
328    pub fn add_metadata(
329        mut self,
330        metadata: impl IntoIterator<Item = (impl Into<String>, impl Into<MetadataValue>)>,
331    ) -> Self {
332        self.metadata
333            .extend(metadata.into_iter().map(|(k, v)| (k.into(), v.into())));
334        self
335    }
336
337    /// Returns true if this field is a metadata column.
338    pub fn is_metadata_column(&self) -> bool {
339        self.metadata
340            .contains_key(ColumnMetadataKey::MetadataSpec.as_ref())
341    }
342
343    /// Returns the metadata column spec if this is a metadata column, otherwise returns None.
344    pub fn get_metadata_column_spec(&self) -> Option<MetadataColumnSpec> {
345        match self
346            .metadata
347            .get(ColumnMetadataKey::MetadataSpec.as_ref())?
348        {
349            MetadataValue::String(s) => MetadataColumnSpec::from_str(s).ok(),
350            _ => None,
351        }
352    }
353
354    /// Returns true if this field is an internal column added by Kernel.
355    ///
356    /// Internal columns must be removed before returning scan results to the user.
357    pub fn is_internal_column(&self) -> bool {
358        matches!(
359            self.metadata
360                .get(ColumnMetadataKey::InternalColumn.as_ref()),
361            Some(MetadataValue::Boolean(true))
362        )
363    }
364
365    /// Marks this field as an internal column.
366    pub fn as_internal_column(self) -> Self {
367        self.add_metadata(vec![(
368            ColumnMetadataKey::InternalColumn.as_ref().to_string(),
369            MetadataValue::Boolean(true),
370        )])
371    }
372
373    pub fn get_config_value(&self, key: &ColumnMetadataKey) -> Option<&MetadataValue> {
374        self.metadata.get(key.as_ref())
375    }
376
377    /// Returns this field's `delta.columnMapping.id` annotation if present and well-formed.
378    /// Returns `None` if the annotation is missing or carries a non-numeric value.
379    pub fn column_mapping_id(&self) -> Option<i64> {
380        match self.get_config_value(&ColumnMetadataKey::ColumnMappingId)? {
381            MetadataValue::Number(n) => Some(*n),
382            _ => None,
383        }
384    }
385
386    /// Validates and extracts pre-existing column-mapping annotations on this field, returning
387    /// the parsed `id` and `physical_name` borrowed from the field's metadata. Returning the
388    /// parsed values lets the column-mapping assignment dispatch match on
389    /// `(Option<i64>, Option<&str>)` directly, which makes the dispatch total and obviates a
390    /// catch-all panic for malformed annotations the validator already rejects.
391    ///
392    /// Rejects:
393    /// - `delta.columnMapping.id` is present but not a `MetadataValue::Number`,
394    /// - `delta.columnMapping.id` is a `Number` but lies outside the protocol's 32-bit non-negative
395    ///   range (negative or `> i32::MAX`); see
396    ///   [`crate::table_features::validate_column_mapping_id`],
397    /// - `delta.columnMapping.physicalName` is present but not a `MetadataValue::String`,
398    /// - `delta.columnMapping.physicalName` is an empty `String`.
399    ///
400    /// Empty-name, negative-id, and over-`i32::MAX` id are stricter than delta-spark (which
401    /// accepts the first two and historically truncates the third); kernel fails fast at
402    /// write time so a connector that supplies bad metadata learns about it on the call that
403    /// produced it. Wrong-typed `id` errors take precedence over wrong-typed `physicalName`
404    /// errors (a connector that fixes the `id` and retries will then see the `physicalName`
405    /// error).
406    pub(crate) fn validate_and_extract_existing_column_mapping_annotations(
407        &self,
408    ) -> DeltaResult<ExistingColumnMappingAnnotations<'_>> {
409        let id = match self.get_config_value(&ColumnMetadataKey::ColumnMappingId) {
410            Some(MetadataValue::Number(n)) => {
411                validate_column_mapping_id(*n)
412                    .map_err(|e| Error::schema(format!("Field '{}': {e}", self.name)))?;
413                Some(*n)
414            }
415            None => None,
416            Some(_) => {
417                return Err(Error::schema(format!(
418                    "Field '{}' has a non-numeric `{}` annotation",
419                    self.name,
420                    ColumnMetadataKey::ColumnMappingId.as_ref(),
421                )));
422            }
423        };
424        let physical_name =
425            match self.get_config_value(&ColumnMetadataKey::ColumnMappingPhysicalName) {
426                Some(MetadataValue::String(s)) if s.is_empty() => {
427                    return Err(Error::schema(format!(
428                        "Field '{}' has an empty `{}` annotation",
429                        self.name,
430                        ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(),
431                    )));
432                }
433                Some(MetadataValue::String(s)) => Some(s.as_str()),
434                None => None,
435                Some(_) => {
436                    return Err(Error::schema(format!(
437                        "Field '{}' has a non-string `{}` annotation",
438                        self.name,
439                        ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(),
440                    )));
441                }
442            };
443        Ok(ExistingColumnMappingAnnotations { id, physical_name })
444    }
445
446    /// Recursively collects every `delta.columnMapping.id` reachable from this field --
447    /// the field's own ID plus any nested struct fields under Struct/Array/Map/Variant.
448    /// Test-only.
449    #[cfg(any(test, feature = "test-utils"))]
450    pub fn collect_column_mapping_ids(&self) -> Vec<i64> {
451        struct CollectIds(Vec<i64>);
452        impl<'a> SchemaTransform<'a> for CollectIds {
453            transform_output_type!(|'a, T| ());
454
455            fn transform_struct_field(&mut self, field: &'a StructField) {
456                if let Some(id) = field.column_mapping_id() {
457                    self.0.push(id);
458                }
459                self.recurse_into_struct_field(field)
460            }
461        }
462        let mut visitor = CollectIds(Vec::new());
463        visitor.transform_struct_field(self);
464        visitor.0
465    }
466
467    /// Get the physical name for this field as it should be read from parquet.
468    ///
469    /// When `column_mapping_mode` is `None`, always returns the logical name (even if physical
470    /// name metadata is present). When mode is `Id` or `Name`, returns the physical name from
471    /// metadata if present, otherwise returns the logical name.
472    ///
473    /// NOTE: Caller affirms that the schema was already validated by
474    /// [`crate::table_configuration::TableConfiguration::try_new`], to ensure that annotations are
475    /// always and only present when column mapping mode is enabled.
476    #[internal_api]
477    pub(crate) fn physical_name(&self, column_mapping_mode: ColumnMappingMode) -> &str {
478        match column_mapping_mode {
479            ColumnMappingMode::None => &self.name,
480            ColumnMappingMode::Id | ColumnMappingMode::Name => {
481                match self
482                    .metadata
483                    .get(ColumnMetadataKey::ColumnMappingPhysicalName.as_ref())
484                {
485                    Some(MetadataValue::String(physical_name)) => physical_name,
486                    _ => &self.name,
487                }
488            }
489        }
490    }
491
492    /// Returns true if this field has a physical name annotation
493    /// in its column mapping metadata.
494    pub(crate) fn has_physical_name_annotation(&self) -> bool {
495        matches!(
496            self.metadata
497                .get(ColumnMetadataKey::ColumnMappingPhysicalName.as_ref()),
498            Some(MetadataValue::String(_))
499        )
500    }
501
502    /// Returns true if this field has a column mapping ID annotation
503    /// in its column mapping metadata.
504    pub(crate) fn has_id_annotation(&self) -> bool {
505        matches!(
506            self.metadata
507                .get(ColumnMetadataKey::ColumnMappingId.as_ref()),
508            Some(MetadataValue::Number(_))
509        )
510    }
511
512    /// Change the name of a field. The field will preserve its data type and nullability. Note that
513    /// this allocates a new field.
514    pub fn with_name(&self, new_name: impl Into<String>) -> Self {
515        StructField {
516            name: new_name.into(),
517            data_type: self.data_type().clone(),
518            nullable: self.nullable,
519            metadata: self.metadata.clone(),
520        }
521    }
522
523    #[inline]
524    pub fn name(&self) -> &String {
525        &self.name
526    }
527
528    #[inline]
529    pub fn is_nullable(&self) -> bool {
530        self.nullable
531    }
532
533    #[inline]
534    pub const fn data_type(&self) -> &DataType {
535        &self.data_type
536    }
537
538    #[inline]
539    pub const fn metadata(&self) -> &HashMap<String, MetadataValue> {
540        &self.metadata
541    }
542
543    /// Convert our metadata into a HashMap<String, String>. Note this copies all the data so can be
544    /// expensive for large metadata
545    pub fn metadata_with_string_values(&self) -> HashMap<String, String> {
546        self.metadata
547            .iter()
548            .map(|(key, val)| (key.clone(), val.to_string()))
549            .collect()
550    }
551
552    /// Applies physical name and field ID mappings to this field.
553    ///
554    /// This function sets the field ID for the physical [`StructField`] only if the
555    /// `column_mapping_mode` is `Id`. The field ID is specified using the
556    /// [`ColumnMetadataKey::ParquetFieldId`] metadata field. Readers should use
557    /// [`ColumnMetadataKey::ParquetFieldId`] to match fields to the Parquet schema.
558    /// If a physical StructField contains a field ID, the reader must resolve columns
559    /// with that ID. Otherwise, the physical StructField's name is used. For details,
560    /// see [`read_parquet_files`].
561    ///
562    /// This function also sets the physical name of a field. If `column_mapping_mode` is
563    /// `Id` or `Name`, this is specified in [`ColumnMetadataKey::ColumnMappingPhysicalName`].
564    /// Otherwise, the field's logical name is used.
565    ///
566    /// Returns an error if a field has invalid or inconsistent column mapping annotations (e.g.
567    /// missing when column mapping is enabled, present when disabled, or wrong type), or if a
568    /// metadata column is encountered (metadata columns should not participate in column mapping).
569    ///
570    /// [`read_parquet_files`]: crate::ParquetHandler::read_parquet_files
571    #[internal_api]
572    pub(crate) fn make_physical(
573        &self,
574        column_mapping_mode: ColumnMappingMode,
575    ) -> DeltaResult<Self> {
576        MakePhysical::new(column_mapping_mode)
577            .transform_struct_field(self)
578            .map(|f| f.into_owned())
579    }
580
581    pub(crate) fn has_invariants(&self) -> bool {
582        self.metadata
583            .contains_key(ColumnMetadataKey::Invariants.as_ref())
584    }
585
586    /// Converts logical schema StructField metadata to physical schema metadata
587    /// based on the specified `column_mapping_mode`.
588    ///
589    /// NOTE: Must not be called on metadata columns, which are not subject to column mapping.
590    ///
591    /// NOTE: Caller affirms that `self` was already validated by
592    /// [`crate::table_features::validate_and_extract_column_mapping_annotations`], to ensure that
593    /// annotations are always and only present when column mapping mode is enabled.
594    fn logical_to_physical_metadata(
595        &self,
596        column_mapping_mode: ColumnMappingMode,
597    ) -> HashMap<String, MetadataValue> {
598        let mut base_metadata = self.metadata.clone();
599        let physical_name_key = ColumnMetadataKey::ColumnMappingPhysicalName.as_ref();
600        let field_id_key = ColumnMetadataKey::ColumnMappingId.as_ref();
601        let parquet_field_id_key = ColumnMetadataKey::ParquetFieldId.as_ref();
602        let field_id = base_metadata.get(ColumnMetadataKey::ColumnMappingId.as_ref());
603        match column_mapping_mode {
604            ColumnMappingMode::Id => {
605                let Some(MetadataValue::Number(fid)) = field_id else {
606                    // `validate_and_extract_column_mapping_annotations` should have verified that
607                    // this has a field Id
608                    warn!("StructField with name {} is missing field id in the Id column mapping mode", self.name());
609                    debug_assert!(false);
610                    return base_metadata;
611                };
612                // Insert the parquet field id matching the column mapping id
613                base_metadata.insert(
614                    parquet_field_id_key.to_string(),
615                    MetadataValue::Number(*fid),
616                );
617                // Ensure that physical name is present
618                debug_assert!(base_metadata.contains_key(physical_name_key));
619            }
620            ColumnMappingMode::Name => {
621                // Logical metadata should have the column mapping metadata keys
622                debug_assert!(base_metadata.contains_key(physical_name_key));
623                debug_assert!(base_metadata.contains_key(field_id_key));
624
625                // Retain column mapping id and insert parquet field id so that
626                // Parquet files carry field IDs in Name mode as well (matching
627                // the Delta protocol requirement and Delta Spark behaviour).
628                let Some(MetadataValue::Number(fid)) = field_id else {
629                    warn!("StructField with name {} is missing field id in the Name column mapping mode", self.name());
630                    debug_assert!(false);
631                    return base_metadata;
632                };
633                base_metadata.insert(
634                    parquet_field_id_key.to_string(),
635                    MetadataValue::Number(*fid),
636                );
637                // TODO(#1070): Remove nested column ids when they are supported in kernel
638            }
639            ColumnMappingMode::None => {
640                base_metadata.remove(physical_name_key);
641                base_metadata.remove(field_id_key);
642                base_metadata.remove(parquet_field_id_key);
643                // TODO(#1070): Remove nested column ids when they are supported in kernel
644            }
645        }
646        base_metadata
647    }
648}
649
650impl Display for StructField {
651    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
652        let mut metadata_str = String::from("{");
653        let mut first = true;
654        for (k, v) in self.metadata.iter() {
655            if !first {
656                metadata_str.push_str(", ");
657            }
658            first = false;
659            metadata_str.push_str(&format!("{k}: {v:?}"));
660        }
661        metadata_str.push('}');
662        write!(
663            f,
664            "{}: {} (is nullable: {}, metadata: {})",
665            self.name, self.data_type, self.nullable, metadata_str,
666        )
667    }
668}
669
670/// A struct is used to represent both the top-level schema of the table
671/// as well as struct columns that contain nested columns.
672#[derive(Debug, PartialEq, Clone, Eq)]
673pub struct StructType {
674    type_name: String,
675    /// The fields stored in this struct
676    // We use indexmap to preserve the order of fields as they are defined in the schema
677    // while also allowing for fast lookup by name. The alternative is to do a linear search
678    // for each field by name would be potentially quite expensive for large schemas.
679    fields: IndexMap<String, StructField>,
680    /// The metadata columns in this struct
681    // We use a dedicated map for metadata columns to allow for fast lookup without having to
682    // iterate over all fields.
683    metadata_columns: HashMap<MetadataColumnSpec, usize>,
684}
685
686pub struct StructTypeBuilder {
687    fields: IndexMap<String, StructField>,
688}
689
690impl Default for StructTypeBuilder {
691    fn default() -> Self {
692        Self::new()
693    }
694}
695
696impl StructTypeBuilder {
697    pub fn new() -> Self {
698        Self {
699            fields: IndexMap::new(),
700        }
701    }
702
703    pub fn from_schema(schema: &StructType) -> Self {
704        Self {
705            fields: schema.fields.clone(),
706        }
707    }
708
709    pub fn add_field(mut self, field: StructField) -> Self {
710        self.fields.insert(field.name.clone(), field);
711        self
712    }
713
714    pub fn build(self) -> DeltaResult<StructType> {
715        StructType::try_new(self.fields.into_values())
716    }
717
718    pub fn build_arc_unchecked(self) -> Arc<StructType> {
719        Arc::new(StructType::new_unchecked(self.fields.into_values()))
720    }
721}
722
723impl StructType {
724    /// Creates a new [`StructType`] from the given fields.
725    ///
726    /// Returns an error if:
727    /// - the schema contains duplicate field names (case-insensitive; Delta column names are
728    ///   case-insensitive per the protocol)
729    /// - the schema contains duplicate metadata columns
730    /// - the schema contains nested metadata columns
731    pub fn try_new(fields: impl IntoIterator<Item = StructField>) -> DeltaResult<Self> {
732        let mut field_map = IndexMap::new();
733        let mut metadata_columns = HashMap::new();
734        let mut seen_lowercase_names = HashSet::new();
735
736        // Validate each field during insertion
737        for (i, field) in fields.into_iter().enumerate() {
738            // Verify that there are no nested metadata columns
739            if !matches!(field.data_type, DataType::Primitive(_)) {
740                Self::ensure_no_metadata_columns_in_field(&field)?;
741            }
742
743            // Check for duplicate metadata columns
744            if let Some(metadata_column_spec) = field.get_metadata_column_spec() {
745                if metadata_columns.insert(metadata_column_spec, i).is_some() {
746                    return Err(Error::schema(format!(
747                        "Duplicate metadata column: {metadata_column_spec:?}",
748                    )));
749                }
750            }
751
752            // Delta column names are case-insensitive; reject schemas with duplicates that differ
753            // only by case.
754            let key = field.name.to_lowercase();
755            if !seen_lowercase_names.insert(key) {
756                return Err(Error::schema(format!(
757                    "Duplicate field name (case-insensitive): '{}'",
758                    field.name
759                )));
760            }
761
762            field_map.insert(field.name.clone(), field);
763        }
764
765        Ok(Self {
766            type_name: "struct".into(),
767            fields: field_map,
768            metadata_columns,
769        })
770    }
771
772    /// Creates a new [`StructType`] from a fallible iterator of fields.
773    ///
774    /// This constructor collects all fields from the iterator, returning the first error
775    /// encountered, or a new [`StructType`] if all fields are successfully collected and validated.
776    pub fn try_from_results<E: Into<Error>>(
777        fields: impl IntoIterator<Item = Result<StructField, E>>,
778    ) -> DeltaResult<Self> {
779        fields
780            .into_iter()
781            .map(|result| result.map_err(Into::into))
782            .process_results(|iter| Self::try_new(iter))?
783    }
784
785    pub fn builder() -> StructTypeBuilder {
786        StructTypeBuilder::new()
787    }
788
789    /// Creates a new [`StructType`] from the given fields without validating them.
790    ///
791    /// This should only be used when you are sure that the fields are valid.
792    /// Refer to [`StructType::try_new`] for more details on the validation checks.
793    #[internal_api]
794    pub(crate) fn new_unchecked(fields: impl IntoIterator<Item = StructField>) -> Self {
795        let mut field_map = IndexMap::new();
796        let mut metadata_columns = HashMap::new();
797
798        for (i, field) in fields.into_iter().enumerate() {
799            if let Some(metadata_column_spec) = field.get_metadata_column_spec() {
800                metadata_columns.insert(metadata_column_spec, i);
801            }
802            field_map.insert(field.name.clone(), field);
803        }
804
805        Self {
806            type_name: "struct".into(),
807            fields: field_map,
808            metadata_columns,
809        }
810    }
811
812    /// Gets a [`StructType`] containing [`StructField`]s of the given names. The order of fields in
813    /// the returned schema will match the order passed to this function, which can be different
814    /// from this order in this schema. Returns an Err if a specified field doesn't exist.
815    pub fn project_as_struct(&self, names: &[impl AsRef<str>]) -> DeltaResult<StructType> {
816        let fields = names.iter().map(|name| {
817            self.fields
818                .get(name.as_ref())
819                .cloned()
820                .ok_or_else(|| Error::missing_column(name.as_ref()))
821        });
822        Self::try_from_results(fields)
823    }
824
825    /// Gets a [`SchemaRef`] containing [`StructField`]s of the given names. The order of fields in
826    /// the returned schema will match the order passed to this function, which can be different
827    /// from this order in this schema. Returns an Err if a specified field doesn't exist.
828    pub fn project(&self, names: &[impl AsRef<str>]) -> DeltaResult<SchemaRef> {
829        let struct_type = self.project_as_struct(names)?;
830        Ok(Arc::new(struct_type))
831    }
832
833    /// Adds fields to this [`StructType`], returning a new [`StructType`].
834    pub fn add(&self, fields: impl IntoIterator<Item = StructField>) -> DeltaResult<Self> {
835        Self::try_new(self.fields.values().cloned().chain(fields))
836    }
837
838    /// Adds a predefined metadata column to this [`StructType`], returning a new [`StructType`].
839    pub fn add_metadata_column(
840        &self,
841        name: impl Into<String>,
842        spec: MetadataColumnSpec,
843    ) -> DeltaResult<Self> {
844        self.add([StructField::create_metadata_column(name, spec)])
845    }
846
847    /// Returns the index of the field with the given name, or None if not found.
848    pub fn index_of(&self, name: impl AsRef<str>) -> Option<usize> {
849        self.fields.get_index_of(name.as_ref())
850    }
851
852    /// Returns the index of the metadata column with the given spec, or None if not found.
853    pub fn index_of_metadata_column(&self, spec: &MetadataColumnSpec) -> Option<&usize> {
854        self.metadata_columns.get(spec)
855    }
856
857    /// Checks if the [`StructType`] contains a field with the specified name.
858    pub fn contains(&self, name: impl AsRef<str>) -> bool {
859        self.fields.contains_key(name.as_ref())
860    }
861
862    /// Checks if the [`StructType`] contains a metadata column with the given spec.
863    pub fn contains_metadata_column(&self, spec: &MetadataColumnSpec) -> bool {
864        self.metadata_columns.contains_key(spec)
865    }
866
867    /// Gets the field with the given name.
868    pub fn field(&self, name: impl AsRef<str>) -> Option<&StructField> {
869        self.fields.get(name.as_ref())
870    }
871
872    /// Resolves a column path through nested structs, returning references to all
873    /// [`StructField`]s along the path. The last element is the leaf field.
874    ///
875    /// Each element of the path must resolve to a field in the current struct. All intermediate
876    /// (non-leaf) fields must be struct types.
877    ///
878    /// Returns an error if the path is empty, a field is not found, or an intermediate
879    /// field is not a struct type.
880    #[internal_api]
881    pub(crate) fn walk_column_fields<'a>(
882        &'a self,
883        col: &ColumnName,
884    ) -> DeltaResult<Vec<&'a StructField>> {
885        self.walk_column_fields_by(col, |s, name| s.field(name))
886    }
887
888    /// Helper to walk through nested columns. For each path component in `col`, calls
889    ///
890    /// `find_field(current_struct, component)` to locate the matching field, then descends
891    ///
892    /// into the next nested struct. Returns references to all [`StructField`]s along the path.
893    pub(crate) fn walk_column_fields_by<'a, F>(
894        &'a self,
895        col: &ColumnName,
896        find_field: F,
897    ) -> DeltaResult<Vec<&'a StructField>>
898    where
899        F: for<'b> Fn(&'b StructType, &str) -> Option<&'b StructField>,
900    {
901        let path = col.path();
902        if path.is_empty() {
903            return Err(Error::generic("Column path cannot be empty"));
904        }
905        let mut current_struct = self;
906        let mut fields = Vec::with_capacity(path.len());
907        for (i, field_name) in path.iter().enumerate() {
908            let field = find_field(current_struct, field_name).ok_or_else(|| {
909                Error::generic(format!(
910                    "Could not resolve column '{col}': field '{field_name}' not found in schema"
911                ))
912            })?;
913            fields.push(field);
914            if i < path.len() - 1 {
915                let DataType::Struct(inner) = field.data_type() else {
916                    return Err(Error::generic(format!(
917                        "Cannot resolve column '{col}': intermediate field '{field_name}' \
918                         is not a struct type"
919                    )));
920                };
921                current_struct = inner;
922            }
923        }
924        Ok(fields)
925    }
926
927    /// Gets the field with the given name and its index.
928    pub fn field_with_index(&self, name: impl AsRef<str>) -> Option<(usize, &StructField)> {
929        self.fields
930            .get_full(name.as_ref())
931            .map(|(index, _, field)| (index, field))
932    }
933
934    /// Gets the field at the given index.
935    pub fn field_at_index(&self, index: usize) -> Option<&StructField> {
936        self.fields.get_index(index).map(|(_, field)| field)
937    }
938
939    /// Gets a reference to all the fields in this struct type.
940    pub fn fields(
941        &self,
942    ) -> impl ExactSizeIterator<Item = &StructField> + DoubleEndedIterator + FusedIterator {
943        self.fields.values()
944    }
945
946    /// Gets an iterator over all the fields in this struct type.
947    pub fn into_fields(
948        self,
949    ) -> impl ExactSizeIterator<Item = StructField> + DoubleEndedIterator + FusedIterator {
950        self.fields.into_values()
951    }
952
953    /// Gets a mutable reference to the underlying field map.
954    pub(crate) fn field_map_mut(&mut self) -> &mut IndexMap<String, StructField> {
955        &mut self.fields
956    }
957
958    /// Walk a pre-segmented column path through this schema and return the leaf field.
959    ///
960    /// `path` is the path's individual name segments (one per nesting level), already split by
961    /// the caller. Lookup at each level is case-insensitive. The Delta protocol uses `.` as the
962    /// dotted-path separator at the API surface, but `field_at_path` itself does not split --
963    /// callers typically pass [`ColumnName::path()`](crate::expressions::ColumnName::path),
964    /// which yields the segments directly.
965    ///
966    /// Panics if any segment is missing or an intermediate field is not a struct. Intended for
967    /// use in test assertions.
968    ///
969    /// # Example
970    ///
971    /// ```ignore
972    /// // Schema:
973    /// //   id:      INTEGER  not null
974    /// //   address: STRUCT { city: STRING not null, zip: STRING }
975    /// let path = vec!["address".to_string(), "city".to_string()];
976    /// let city = schema.field_at_path(&path);
977    /// assert_eq!(city.name(), "city");
978    /// assert!(!city.is_nullable());
979    /// ```
980    #[cfg(any(test, feature = "test-utils"))]
981    #[allow(clippy::panic, clippy::expect_used)]
982    pub fn field_at_path<'a>(&'a self, path: &[String]) -> &'a StructField {
983        fn find_ci<'a>(
984            mut fields: impl Iterator<Item = &'a StructField>,
985            name: &str,
986        ) -> &'a StructField {
987            let lowered = name.to_lowercase();
988            fields
989                .find(|f| f.name().to_lowercase() == lowered)
990                .unwrap_or_else(|| panic!("field '{name}' not found"))
991        }
992        let (first, rest) = path.split_first().expect("non-empty path");
993        let mut field = find_ci(self.fields(), first);
994        for seg in rest {
995            let DataType::Struct(s) = field.data_type() else {
996                panic!("expected struct at intermediate segment '{seg}'");
997            };
998            field = find_ci(s.fields(), seg);
999        }
1000        field
1001    }
1002
1003    /// Gets all the field names in this struct type in the order they are defined.
1004    pub fn field_names(&self) -> impl ExactSizeIterator<Item = &String> {
1005        self.fields.keys()
1006    }
1007
1008    /// Gets the number of fields in this struct type.
1009    pub fn num_fields(&self) -> usize {
1010        // O(1) for indexmap
1011        self.fields.len()
1012    }
1013
1014    /// Recursively counts all [`StructField`] nodes in this schema tree.
1015    ///
1016    /// This includes nested struct fields (inside Struct, Array, and Map types) but does not
1017    /// count Array/Map containers themselves. This matches the traversal pattern used by
1018    /// `assign_column_mapping_metadata` when assigning column IDs, so the result equals the
1019    /// expected `delta.columnMapping.maxColumnId` for a newly created table.
1020    #[allow(unused)] // Only used by integration tests (create_table/column_mapping.rs)
1021    #[internal_api]
1022    pub(crate) fn total_struct_fields(&self) -> usize {
1023        fn count_data_type(dt: &DataType) -> usize {
1024            match dt {
1025                DataType::Struct(inner) => inner.total_struct_fields(),
1026                DataType::Array(array) => count_data_type(array.element_type()),
1027                DataType::Map(map) => {
1028                    count_data_type(map.key_type()) + count_data_type(map.value_type())
1029                }
1030                _ => 0,
1031            }
1032        }
1033        self.fields()
1034            .map(|field| 1 + count_data_type(field.data_type()))
1035            .sum()
1036    }
1037
1038    /// Gets a reference to the metadata column with the given spec.
1039    pub fn metadata_column(&self, spec: &MetadataColumnSpec) -> Option<&StructField> {
1040        self.metadata_columns
1041            .get(spec)
1042            .and_then(|index| self.fields.get_index(*index).map(|(_, field)| field))
1043    }
1044
1045    /// Gets an iterator over all the metadata columns in this struct type.
1046    pub fn metadata_columns(&self) -> impl Iterator<Item = &StructField> {
1047        self.metadata_columns
1048            .values()
1049            .filter_map(|index| self.fields.get_index(*index).map(|(_, field)| field))
1050    }
1051
1052    /// Extracts the name and type of all leaf columns, in schema order. Caller should pass Some
1053    /// `own_name` if this schema is embedded in a larger struct (e.g. `add.*`) and None if the
1054    /// schema is a top-level result (e.g. `*`).
1055    ///
1056    /// NOTE: This method only traverses through `StructType` fields; `MapType` and `ArrayType`
1057    /// fields are considered leaves even if they contain `StructType` entries/elements.
1058    #[allow(unused)]
1059    #[internal_api]
1060    pub(crate) fn leaves<'s>(&self, own_name: impl Into<Option<&'s str>>) -> ColumnNamesAndTypes {
1061        let mut get_leaves = GetSchemaLeaves::new(own_name.into());
1062        get_leaves.transform_struct(self);
1063        (get_leaves.names, get_leaves.types).into()
1064    }
1065
1066    /// Applies physical name mappings to this field. If the `column_mapping_mode` is
1067    /// [`ColumnMappingMode::Id`], then each StructField will have its parquet field id in the
1068    /// [`ColumnMetadataKey::ParquetFieldId`] metadata field.
1069    ///
1070    /// Uses a single transformer so duplicate column mapping IDs are detected across all
1071    /// fields in this struct, not just within each field's subtree.
1072    #[internal_api]
1073    pub(crate) fn make_physical(
1074        &self,
1075        column_mapping_mode: ColumnMappingMode,
1076    ) -> DeltaResult<Self> {
1077        let mut transformer = MakePhysical::new(column_mapping_mode);
1078        transformer.transform_struct(self).map(|s| s.into_owned())
1079    }
1080
1081    /// Validates that there are no metadata columns in the given fields.
1082    pub(crate) fn ensure_no_metadata_columns(
1083        fields: &mut dyn Iterator<Item = &StructField>,
1084    ) -> DeltaResult<()> {
1085        for field in fields {
1086            Self::ensure_no_metadata_columns_in_field(field)?;
1087        }
1088        Ok(())
1089    }
1090
1091    /// Validates that there are no metadata columns in the given field.
1092    pub(crate) fn ensure_no_metadata_columns_in_field(field: &StructField) -> DeltaResult<()> {
1093        if field.is_metadata_column() {
1094            return Err(Error::schema(
1095                "Metadata columns are only allowed at the top level of a schema".to_string(),
1096            ));
1097        }
1098
1099        match &field.data_type {
1100            DataType::Struct(struct_type) => {
1101                // Only check leaf fields; nested structs were validated at their creation
1102                Self::ensure_no_metadata_columns(&mut struct_type.fields().filter(|f| {
1103                    !matches!(f.data_type, DataType::Struct(_) | DataType::Variant(_))
1104                }))?;
1105            }
1106            DataType::Array(array_type) => {
1107                if let DataType::Struct(struct_type) = array_type.element_type() {
1108                    Self::ensure_no_metadata_columns(&mut struct_type.fields())?;
1109                }
1110            }
1111            DataType::Map(map_type) => {
1112                if let DataType::Struct(struct_type) = map_type.key_type() {
1113                    Self::ensure_no_metadata_columns(&mut struct_type.fields())?;
1114                }
1115                if let DataType::Struct(struct_type) = map_type.value_type() {
1116                    Self::ensure_no_metadata_columns(&mut struct_type.fields())?;
1117                }
1118            }
1119            // Primitive types cannot contain nested metadata columns and variant types are
1120            // validated at creation
1121            DataType::Primitive(_) | DataType::Variant(_) => {}
1122        };
1123
1124        Ok(())
1125    }
1126
1127    /// Returns a StructType with `new_field` inserted after the field named `after`.
1128    /// If `new_field`  already presents in the schema, an error is returned.
1129    /// If `after` is None, `new_field` is appended to the end.
1130    /// If `after` is not found, an error is returned.
1131    pub fn with_field_inserted_after(
1132        mut self,
1133        after: Option<&str>,
1134        new_field: StructField,
1135    ) -> DeltaResult<Self> {
1136        // TODO: Upgrade to a case-insensitive duplicate check when this method is used for
1137        // user-facing operations like ALTER TABLE ADD COLUMN. Currently only used internally
1138        // for inserting protocol-defined fields (e.g. stats_parsed) where exact-name matching
1139        // is sufficient.
1140        if self.fields.contains_key(&new_field.name) {
1141            return Err(Error::generic(format!(
1142                "Field {} already exists",
1143                new_field.name
1144            )));
1145        }
1146
1147        let insert_index = after
1148            .map(|after| {
1149                self.fields
1150                    .get_index_of(after)
1151                    .map(|index| index + 1)
1152                    .ok_or_else(|| Error::generic(format!("Field {after} not found")))
1153            })
1154            .unwrap_or_else(|| Ok(self.fields.len()))?;
1155
1156        self.fields
1157            .insert_before(insert_index, new_field.name.clone(), new_field);
1158        Ok(self)
1159    }
1160
1161    /// Returns a StructType with `new_field` inserted before the field named `before`.
1162    /// If `new_field` already presents in the schema, an error is returned.
1163    /// If `before` is None, `new_field` is inserted at the beginning.
1164    /// If `before` is not found, an error is returned.
1165    pub fn with_field_inserted_before(
1166        mut self,
1167        before: Option<&str>,
1168        new_field: StructField,
1169    ) -> DeltaResult<Self> {
1170        // TODO: Upgrade to a case-insensitive duplicate check when this method is used for
1171        // user-facing operations like ALTER TABLE ADD COLUMN. Currently only used internally
1172        // for inserting protocol-defined fields where exact-name matching is sufficient.
1173        if self.fields.contains_key(&new_field.name) {
1174            return Err(Error::generic(format!(
1175                "Field {} already exists",
1176                new_field.name
1177            )));
1178        }
1179
1180        let index_of_before = before
1181            .map(|before| {
1182                self.fields
1183                    .get_index_of(before)
1184                    .ok_or_else(|| Error::generic(format!("Field {before} not found")))
1185            })
1186            .unwrap_or_else(|| Ok(0))?;
1187
1188        self.fields
1189            .insert_before(index_of_before, new_field.name.clone(), new_field);
1190        Ok(self)
1191    }
1192
1193    /// Returns a StructType with the named field removed.
1194    /// Returns self unchanged if field doesn't exist.
1195    pub fn with_field_removed(mut self, name: &str) -> Self {
1196        self.fields.shift_remove(name);
1197        self
1198    }
1199
1200    /// Returns a new [`StructType`] containing only the top-level fields for which `predicate`
1201    /// returns `true`. This does not recurse into nested [`StructType`] fields.
1202    pub fn with_fields_filtered(
1203        &self,
1204        predicate: impl Fn(&StructField) -> bool,
1205    ) -> DeltaResult<Self> {
1206        Self::try_new(self.fields().filter(|f| predicate(f)).cloned())
1207    }
1208
1209    /// Returns an optional [`StructType`] containing only the top-level fields for which
1210    /// `predicate` returns `true`.
1211    ///
1212    /// This is a convenience wrapper around [`StructType::with_fields_filtered`] for callers
1213    /// that treat an empty top-level struct as "no schema".
1214    pub fn with_fields_filtered_nonempty(
1215        &self,
1216        predicate: impl Fn(&StructField) -> bool,
1217    ) -> DeltaResult<Option<Self>> {
1218        let filtered = self.with_fields_filtered(predicate)?;
1219        if filtered.num_fields() == 0 {
1220            Ok(None)
1221        } else {
1222            Ok(Some(filtered))
1223        }
1224    }
1225
1226    /// Returns a StructType with the named field replaced.
1227    /// Returns an error if field doesn't exist.
1228    pub fn with_field_replaced(
1229        mut self,
1230        name: &str,
1231        new_field: StructField,
1232    ) -> DeltaResult<StructType> {
1233        let replace_field = self
1234            .fields
1235            .get_mut(name)
1236            .ok_or_else(|| Error::generic(format!("Field {name} not found")))?;
1237
1238        *replace_field = new_field;
1239        Ok(self)
1240    }
1241}
1242
1243fn write_indent(f: &mut Formatter<'_>, levels: &[bool]) -> std::fmt::Result {
1244    let mut it = levels.iter().peekable();
1245
1246    while let Some(is_last) = it.next() {
1247        // Final level → draw branch
1248        if it.peek().is_none() {
1249            write!(f, "{}", if *is_last { "└─" } else { "├─" })?;
1250        }
1251        // Parent levels → vertical line or empty space
1252        else {
1253            write!(f, "{}", if *is_last { "   " } else { "│  " })?;
1254        }
1255    }
1256
1257    Ok(())
1258}
1259
1260fn write_struct_type(
1261    st: &StructType,
1262    f: &mut Formatter<'_>,
1263    levels: &mut Vec<bool>,
1264) -> std::fmt::Result {
1265    let len = st.fields.len();
1266
1267    for (i, (_, field)) in st.fields.iter().enumerate() {
1268        let is_last = i + 1 == len;
1269        levels.push(is_last);
1270
1271        write_indent(f, levels)?;
1272        writeln!(f, "{field}")?;
1273
1274        field.data_type.fmt_recursive(f, levels)?;
1275
1276        levels.pop();
1277    }
1278    Ok(())
1279}
1280
1281impl Display for StructType {
1282    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1283        writeln!(f, "{}:", self.type_name)?;
1284        let mut levels = Vec::new();
1285        write_struct_type(self, f, &mut levels)
1286    }
1287}
1288
1289impl IntoIterator for StructType {
1290    type Item = StructField;
1291    type IntoIter = StructFieldIntoIter;
1292
1293    fn into_iter(self) -> Self::IntoIter {
1294        StructFieldIntoIter {
1295            inner: self.fields.into_values(),
1296        }
1297    }
1298}
1299
1300impl<'a> IntoIterator for &'a StructType {
1301    type Item = &'a StructField;
1302    type IntoIter = StructFieldRefIter<'a>;
1303
1304    fn into_iter(self) -> Self::IntoIter {
1305        StructFieldRefIter {
1306            inner: self.fields.values(),
1307        }
1308    }
1309}
1310
1311/// An iterator that yields owned [`StructField`]s from a [`StructType`].
1312///
1313/// This iterator is returned by the [`IntoIterator`] implementation for [`StructType`] and
1314/// consumes the original struct. It yields each field in the order they were defined in the
1315/// schema, preserving the insertion order maintained by the underlying [`IndexMap`].
1316///
1317/// # Examples
1318///
1319/// ```
1320/// # use buoyant_kernel as delta_kernel;
1321/// # use delta_kernel::Error;
1322/// use delta_kernel::schema::{StructType, StructField, DataType};
1323///
1324/// let fields = vec![
1325///     StructField::new("name", DataType::STRING, false),
1326///     StructField::new("age", DataType::INTEGER, true),
1327/// ];
1328/// let struct_type = StructType::try_new(fields)?;
1329///
1330/// // Consume the struct_type and iterate over owned fields
1331/// for field in struct_type {
1332///     println!("Field: {} ({})", field.name(), field.data_type());
1333/// }
1334/// # Ok::<(), Error>(())
1335/// ```
1336///
1337/// [`IndexMap`]: indexmap::IndexMap
1338#[derive(Debug)]
1339pub struct StructFieldIntoIter {
1340    inner: indexmap::map::IntoValues<String, StructField>,
1341}
1342
1343impl Iterator for StructFieldIntoIter {
1344    type Item = StructField;
1345
1346    fn next(&mut self) -> Option<Self::Item> {
1347        self.inner.next()
1348    }
1349
1350    fn size_hint(&self) -> (usize, Option<usize>) {
1351        self.inner.size_hint()
1352    }
1353
1354    fn count(self) -> usize {
1355        self.inner.count()
1356    }
1357
1358    fn last(self) -> Option<Self::Item> {
1359        self.inner.last()
1360    }
1361
1362    fn nth(&mut self, n: usize) -> Option<Self::Item> {
1363        self.inner.nth(n)
1364    }
1365}
1366
1367impl ExactSizeIterator for StructFieldIntoIter {
1368    fn len(&self) -> usize {
1369        self.inner.len()
1370    }
1371}
1372
1373impl FusedIterator for StructFieldIntoIter {}
1374
1375impl DoubleEndedIterator for StructFieldIntoIter {
1376    fn next_back(&mut self) -> Option<Self::Item> {
1377        self.inner.next_back()
1378    }
1379}
1380
1381/// An iterator that yields references to [`StructField`]s from a [`StructType`].
1382///
1383/// This iterator is returned by the [`IntoIterator`] implementation for `&StructType` and by
1384/// the [`StructType::fields()`] method. Unlike [`StructFieldIntoIter`], this iterator does not
1385/// consume the original struct and yields references to the fields. It preserves the insertion
1386/// order maintained by the underlying [`IndexMap`].
1387///
1388/// This iterator implements [`Clone`], allowing you to create multiple independent iterators
1389/// over the same set of fields.
1390///
1391/// # Examples
1392///
1393/// ```
1394/// # use buoyant_kernel as delta_kernel;
1395/// # use delta_kernel::Error;
1396/// use delta_kernel::schema::{StructType, StructField, DataType};
1397///
1398/// let fields = vec![
1399///     StructField::new("name", DataType::STRING, false),
1400///     StructField::new("age", DataType::INTEGER, true),
1401/// ];
1402/// let struct_type = StructType::try_new(fields)?;
1403///
1404/// // Iterate over field references without consuming the struct_type
1405/// for field in &struct_type {
1406///     println!("Field: {} ({})", field.name(), field.data_type());
1407/// }
1408///
1409/// // struct_type is still available for use
1410/// assert_eq!(struct_type.field("name").unwrap().name(), "name");
1411///
1412/// // Or use the fields() method explicitly
1413/// for field in struct_type.fields() {
1414///     println!("Field type: {}", field.data_type());
1415/// }
1416/// # Ok::<(), Error>(())
1417/// ```
1418///
1419/// [`StructType::fields()`]: StructType::fields
1420/// [`IndexMap`]: indexmap::IndexMap
1421#[derive(Debug, Clone)]
1422pub struct StructFieldRefIter<'a> {
1423    inner: indexmap::map::Values<'a, String, StructField>,
1424}
1425
1426impl<'a> Iterator for StructFieldRefIter<'a> {
1427    type Item = &'a StructField;
1428
1429    fn next(&mut self) -> Option<Self::Item> {
1430        self.inner.next()
1431    }
1432
1433    fn size_hint(&self) -> (usize, Option<usize>) {
1434        self.inner.size_hint()
1435    }
1436}
1437
1438impl ExactSizeIterator for StructFieldRefIter<'_> {
1439    fn len(&self) -> usize {
1440        self.inner.len()
1441    }
1442}
1443
1444impl FusedIterator for StructFieldRefIter<'_> {}
1445
1446impl DoubleEndedIterator for StructFieldRefIter<'_> {
1447    fn next_back(&mut self) -> Option<Self::Item> {
1448        self.inner.next_back()
1449    }
1450}
1451
1452struct InvariantChecker;
1453
1454impl<'a> SchemaTransform<'a> for InvariantChecker {
1455    transform_output_type!(|'a, T| Result<(), ()>);
1456
1457    fn transform_struct_field(&mut self, field: &'a StructField) -> Result<(), ()> {
1458        if field.has_invariants() {
1459            Err(())
1460        } else {
1461            self.recurse_into_struct_field(field)
1462        }
1463    }
1464}
1465
1466/// Checks if any column in the schema (including nested columns) has invariants defined.
1467///
1468/// This traverses the entire schema to check for the presence of the `delta.invariants`
1469/// metadata key.
1470pub(crate) fn schema_has_invariants(schema: &Schema) -> bool {
1471    InvariantChecker.transform_struct(schema).is_err()
1472}
1473
1474/// Visitor that reports whether any non-null (`nullable: false`) field exists in a schema.
1475/// Walks the full schema tree including nested struct, array element, and map value structs.
1476struct NonNullFieldChecker;
1477
1478impl<'a> SchemaTransform<'a> for NonNullFieldChecker {
1479    transform_output_type!(|'a, T| Result<(), ()>);
1480
1481    /// Skip recursion into variant internals. The `metadata` and `value` fields inside a
1482    /// `Variant` are protocol-defined, always non-null, and not user-controlled, so they
1483    /// must not be treated as user-declared non-null columns.
1484    fn transform_variant(&mut self, _stype: &'a StructType) -> Result<(), ()> {
1485        Ok(())
1486    }
1487
1488    fn transform_struct_field(&mut self, field: &'a StructField) -> Result<(), ()> {
1489        if !field.is_nullable() {
1490            return Err(());
1491        }
1492
1493        self.recurse_into_struct_field(field)
1494    }
1495}
1496
1497/// Checks if any user-controlled column in the schema (including nested columns) is declared
1498/// non-null (`nullable: false`).
1499///
1500/// Skips `Variant` internal struct fields, which are protocol-defined and always non-null.
1501pub(crate) fn schema_contains_non_null_fields(schema: &Schema) -> bool {
1502    NonNullFieldChecker.transform_struct(schema).is_err()
1503}
1504
1505/// Normalizes column name field names to match the casing in the schema.
1506///
1507/// Walks each field name through the schema's struct hierarchy, replacing user-provided
1508/// casing with the schema's canonical casing. If a field name isn't found
1509/// case-insensitively, keeps the original (subsequent validation catches it).
1510///
1511/// For example, given schema `{ Id: int, Name: string }` and user-provided columns
1512/// `["id", "name"]`, returns `["Id", "Name"]` -- matching the schema's canonical casing.
1513///
1514/// Note: Must be called before validation (`validate_partition_columns` or
1515/// `validate_clustering_columns`) so that case-normalized names match the schema.
1516pub(crate) fn normalize_column_names_to_schema_casing(
1517    schema: &StructType,
1518    columns: &[ColumnName],
1519) -> Vec<ColumnName> {
1520    columns
1521        .iter()
1522        .map(|col| {
1523            let path = col.path();
1524            let mut normalized: Vec<String> = Vec::with_capacity(path.len());
1525            let mut current_schema = schema;
1526            for (i, field_name) in path.iter().enumerate() {
1527                match current_schema
1528                    .fields()
1529                    .find(|f| f.name().eq_ignore_ascii_case(field_name))
1530                {
1531                    Some(f) => {
1532                        normalized.push(f.name().to_string());
1533                        if let DataType::Struct(inner) = f.data_type() {
1534                            current_schema = inner;
1535                        }
1536                    }
1537                    None => {
1538                        // Field name not found at this level -- keep remaining path
1539                        // unchanged so validation reports the user's original input.
1540                        normalized.extend(path[i..].iter().cloned());
1541                        break;
1542                    }
1543                }
1544            }
1545            ColumnName::new(normalized.iter().map(|s| s.as_str()))
1546        })
1547        .collect()
1548}
1549
1550/// Helper for RowVisitor implementations
1551#[internal_api]
1552#[derive(Clone, Default)]
1553pub(crate) struct ColumnNamesAndTypes(Vec<ColumnName>, Vec<DataType>);
1554impl ColumnNamesAndTypes {
1555    #[internal_api]
1556    pub(crate) fn as_ref(&self) -> (&[ColumnName], &[DataType]) {
1557        (&self.0, &self.1)
1558    }
1559}
1560
1561impl From<(Vec<ColumnName>, Vec<DataType>)> for ColumnNamesAndTypes {
1562    fn from((names, fields): (Vec<ColumnName>, Vec<DataType>)) -> Self {
1563        ColumnNamesAndTypes(names, fields)
1564    }
1565}
1566
1567#[derive(Debug, Deserialize, Serialize)]
1568#[serde(rename_all = "camelCase")]
1569struct StructTypeSerDeHelper {
1570    #[serde(rename = "type")]
1571    type_name: String,
1572    fields: Vec<StructField>,
1573}
1574
1575impl Serialize for StructType {
1576    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1577    where
1578        S: serde::Serializer,
1579    {
1580        StructTypeSerDeHelper {
1581            type_name: self.type_name.clone(),
1582            fields: self.fields.values().cloned().collect(),
1583        }
1584        .serialize(serializer)
1585    }
1586}
1587
1588impl<'de> Deserialize<'de> for StructType {
1589    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1590    where
1591        D: serde::Deserializer<'de>,
1592        Self: Sized,
1593    {
1594        let helper = StructTypeSerDeHelper::deserialize(deserializer)?;
1595        StructType::try_new(helper.fields).map_err(serde::de::Error::custom)
1596    }
1597}
1598
1599#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Eq)]
1600#[serde(rename_all = "camelCase")]
1601pub struct ArrayType {
1602    #[serde(rename = "type")]
1603    pub type_name: String,
1604    /// The type of element stored in this array
1605    pub element_type: DataType,
1606    /// Denoting whether this array can contain one or more null values
1607    pub contains_null: bool,
1608}
1609
1610impl ArrayType {
1611    pub fn new(element_type: DataType, contains_null: bool) -> Self {
1612        Self {
1613            type_name: "array".into(),
1614            element_type,
1615            contains_null,
1616        }
1617    }
1618
1619    #[inline]
1620    pub const fn element_type(&self) -> &DataType {
1621        &self.element_type
1622    }
1623
1624    #[inline]
1625    pub const fn contains_null(&self) -> bool {
1626        self.contains_null
1627    }
1628}
1629
1630#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Eq)]
1631#[serde(rename_all = "camelCase")]
1632pub struct MapType {
1633    #[serde(rename = "type")]
1634    pub type_name: String,
1635    /// The type of element used for the key of this map
1636    pub key_type: DataType,
1637    /// The type of element used for the value of this map
1638    pub value_type: DataType,
1639    /// Denoting whether this map can contain one or more null values
1640    #[serde(default = "default_true")]
1641    pub value_contains_null: bool,
1642}
1643
1644impl MapType {
1645    pub fn new(
1646        key_type: impl Into<DataType>,
1647        value_type: impl Into<DataType>,
1648        value_contains_null: bool,
1649    ) -> Self {
1650        Self {
1651            type_name: "map".into(),
1652            key_type: key_type.into(),
1653            value_type: value_type.into(),
1654            value_contains_null,
1655        }
1656    }
1657
1658    #[inline]
1659    pub const fn key_type(&self) -> &DataType {
1660        &self.key_type
1661    }
1662
1663    #[inline]
1664    pub const fn value_type(&self) -> &DataType {
1665        &self.value_type
1666    }
1667
1668    #[inline]
1669    pub const fn value_contains_null(&self) -> bool {
1670        self.value_contains_null
1671    }
1672
1673    /// Create a schema assuming the map is stored as a struct with the specified key and value
1674    /// field names
1675    pub fn as_struct_schema(&self, key_name: String, val_name: String) -> Schema {
1676        StructType::new_unchecked([
1677            StructField::not_null(key_name, self.key_type.clone()),
1678            StructField::new(val_name, self.value_type.clone(), self.value_contains_null),
1679        ])
1680    }
1681}
1682
1683fn default_true() -> bool {
1684    true
1685}
1686
1687#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
1688pub struct DecimalType {
1689    precision: u8,
1690    scale: u8,
1691}
1692
1693impl DecimalType {
1694    /// Check if the given precision and scale are valid for a decimal type.
1695    pub fn try_new(precision: u8, scale: u8) -> DeltaResult<Self> {
1696        require!(
1697            0 < precision && precision <= 38,
1698            Error::invalid_decimal(format!(
1699                "precision must be in range 1..38 inclusive, found: {precision}."
1700            ))
1701        );
1702        require!(
1703            scale <= precision,
1704            Error::invalid_decimal(format!(
1705                "scale must be in range 0..{precision} inclusive, found: {scale}."
1706            ))
1707        );
1708        Ok(Self { precision, scale })
1709    }
1710
1711    pub fn precision(&self) -> u8 {
1712        self.precision
1713    }
1714
1715    pub fn scale(&self) -> u8 {
1716        self.scale
1717    }
1718}
1719
1720#[derive(Debug, Serialize, PartialEq, Clone, Eq)]
1721#[serde(rename_all = "camelCase")]
1722pub enum PrimitiveType {
1723    /// UTF-8 encoded string of characters
1724    String,
1725    /// i64: 8-byte signed integer. Range: -9223372036854775808 to 9223372036854775807
1726    Long,
1727    /// i32: 4-byte signed integer. Range: -2147483648 to 2147483647
1728    Integer,
1729    /// i16: 2-byte signed integer numbers. Range: -32768 to 32767
1730    Short,
1731    /// i8: 1-byte signed integer number. Range: -128 to 127
1732    Byte,
1733    /// f32: 4-byte single-precision floating-point numbers
1734    Float,
1735    /// f64: 8-byte double-precision floating-point numbers
1736    Double,
1737    /// bool: boolean values
1738    Boolean,
1739    Binary,
1740    Date,
1741    /// Microsecond precision timestamp, adjusted to UTC.
1742    Timestamp,
1743    #[serde(rename = "timestamp_ntz")]
1744    TimestampNtz,
1745    #[cfg(feature = "nanosecond-timestamps")]
1746    #[serde(rename = "timestamp_nanos")]
1747    TimestampNanos,
1748    #[serde(serialize_with = "serialize_decimal", untagged)]
1749    Decimal(DecimalType),
1750}
1751
1752impl PrimitiveType {
1753    pub fn decimal(precision: u8, scale: u8) -> DeltaResult<Self> {
1754        Ok(DecimalType::try_new(precision, scale)?.into())
1755    }
1756
1757    /// Returns `true` if this primitive type can be widened to the `target` type.
1758    ///
1759    /// Widening rules:
1760    /// - Integer widening: byte -> short -> int -> long (Delta protocol type widening)
1761    /// - Float widening: float -> double (Delta protocol type widening)
1762    /// - Timestamp interchangeability: Timestamp <-> TimestampNtz (both are i64 microseconds since
1763    ///   epoch, differing only in timezone semantics; this is a physical read accommodation, not a
1764    ///   Delta protocol type widening rule)
1765    #[internal_api]
1766    pub(crate) fn can_widen_to(&self, target: &Self) -> bool {
1767        use PrimitiveType::*;
1768        matches!(
1769            (self, target),
1770            // Integer widening: smaller types can be read as larger ones
1771            (Byte, Short | Integer | Long)
1772                | (Short, Integer | Long)
1773                | (Integer, Long)
1774                // Float widening: float can be read as double
1775                | (Float, Double)
1776                // Timestamp equivalence: both are i64 microseconds since epoch, differing only
1777                // in timezone semantics. The parquet representation is identical, so reading
1778                // one as the other is safe at the data layer.
1779                | (Timestamp, TimestampNtz)
1780                | (TimestampNtz, Timestamp)
1781        )
1782    }
1783
1784    /// Returns `true` if `self` is a physical integer type that some checkpoint writers
1785    /// produce when they omit Parquet logical type annotations for date or timestamp columns.
1786    ///
1787    /// Specifically:
1788    /// - Integer -> Date (int32 stored without DATE annotation)
1789    /// - Long -> Timestamp/TimestampNtz (int64 stored without TIMESTAMP annotation)
1790    ///
1791    /// These are **not** Delta protocol type widening rules and must not be used outside of
1792    /// checkpoint compatibility checks.
1793    ///
1794    /// NOTE: The Arrow-level equivalent lives in `check_cast_compat` in
1795    /// `engine/ensure_data_types.rs`. Changes here must be mirrored there.
1796    pub(crate) fn is_checkpoint_cast_compatible(&self, target: &Self) -> bool {
1797        matches!(
1798            (self, target),
1799            (Self::Integer, Self::Date) | (Self::Long, Self::Timestamp | Self::TimestampNtz)
1800        )
1801    }
1802
1803    /// Returns `true` if this primitive type is compatible with `target` for reading
1804    /// `stats_parsed` columns from checkpoint parquet files.
1805    ///
1806    /// This is a superset of [`can_widen_to`]: it includes all Delta protocol type widening
1807    /// rules plus physical Parquet encoding accommodations for checkpoint interop (see
1808    /// [`is_checkpoint_cast_compatible`]).
1809    ///
1810    /// [`can_widen_to`]: PrimitiveType::can_widen_to
1811    /// [`is_checkpoint_cast_compatible`]: PrimitiveType::is_checkpoint_cast_compatible
1812    pub(crate) fn is_stats_type_compatible_with(&self, target: &Self) -> bool {
1813        self == target || self.can_widen_to(target) || self.is_checkpoint_cast_compatible(target)
1814    }
1815}
1816
1817fn serialize_decimal<S: serde::Serializer>(
1818    dtype: &DecimalType,
1819    serializer: S,
1820) -> Result<S::Ok, S::Error> {
1821    serializer.serialize_str(&format!("decimal({},{})", dtype.precision(), dtype.scale()))
1822}
1823
1824fn serialize_variant<S: serde::Serializer>(
1825    _: &StructType,
1826    serializer: S,
1827) -> Result<S::Ok, S::Error> {
1828    serializer.serialize_str("variant")
1829}
1830
1831// Custom Deserialize to provide clear error messages for unsupported types.
1832// The derived impl would produce: "unknown variant `interval second`, expected one of ..."
1833// This impl produces: "Unsupported Delta table type: 'interval second'"
1834impl<'de> serde::Deserialize<'de> for PrimitiveType {
1835    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1836    where
1837        D: serde::Deserializer<'de>,
1838    {
1839        let str_value = String::deserialize(deserializer)?;
1840
1841        match str_value.as_str() {
1842            "string" => Ok(PrimitiveType::String),
1843            "long" => Ok(PrimitiveType::Long),
1844            "integer" => Ok(PrimitiveType::Integer),
1845            "short" => Ok(PrimitiveType::Short),
1846            "byte" => Ok(PrimitiveType::Byte),
1847            "float" => Ok(PrimitiveType::Float),
1848            "double" => Ok(PrimitiveType::Double),
1849            "boolean" => Ok(PrimitiveType::Boolean),
1850            "binary" => Ok(PrimitiveType::Binary),
1851            "date" => Ok(PrimitiveType::Date),
1852            "timestamp" => Ok(PrimitiveType::Timestamp),
1853            "timestamp_ntz" => Ok(PrimitiveType::TimestampNtz),
1854            #[cfg(feature = "nanosecond-timestamps")]
1855            "timestamp_nanos" => Ok(PrimitiveType::TimestampNanos),
1856            decimal_str if decimal_str.starts_with("decimal(") && decimal_str.ends_with(')') => {
1857                // Parse decimal type
1858                let mut parts = decimal_str[8..decimal_str.len() - 1].split(',');
1859                let precision = parts
1860                    .next()
1861                    .and_then(|part| part.trim().parse::<u8>().ok())
1862                    .ok_or_else(|| {
1863                        serde::de::Error::custom(format!(
1864                            "Invalid precision in decimal: {decimal_str}"
1865                        ))
1866                    })?;
1867                let scale = parts
1868                    .next()
1869                    .and_then(|part| part.trim().parse::<u8>().ok())
1870                    .ok_or_else(|| {
1871                        serde::de::Error::custom(format!("Invalid scale in decimal: {decimal_str}"))
1872                    })?;
1873                // Reject extra parts (e.g., decimal(10,2,99))
1874                if parts.next().is_some() {
1875                    return Err(serde::de::Error::custom(format!(
1876                        "Invalid decimal format (expected 2 parts): {decimal_str}"
1877                    )));
1878                }
1879                DecimalType::try_new(precision, scale)
1880                    .map(PrimitiveType::Decimal)
1881                    .map_err(serde::de::Error::custom)
1882            }
1883            unsupported => Err(serde::de::Error::custom(format!(
1884                "Unsupported Delta table type: '{unsupported}'"
1885            ))),
1886        }
1887    }
1888}
1889
1890impl Display for PrimitiveType {
1891    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1892        match self {
1893            PrimitiveType::String => write!(f, "string"),
1894            PrimitiveType::Long => write!(f, "long"),
1895            PrimitiveType::Integer => write!(f, "integer"),
1896            PrimitiveType::Short => write!(f, "short"),
1897            PrimitiveType::Byte => write!(f, "byte"),
1898            PrimitiveType::Float => write!(f, "float"),
1899            PrimitiveType::Double => write!(f, "double"),
1900            PrimitiveType::Boolean => write!(f, "boolean"),
1901            PrimitiveType::Binary => write!(f, "binary"),
1902            PrimitiveType::Date => write!(f, "date"),
1903            PrimitiveType::Timestamp => write!(f, "timestamp"),
1904            PrimitiveType::TimestampNtz => write!(f, "timestamp_ntz"),
1905            #[cfg(feature = "nanosecond-timestamps")]
1906            PrimitiveType::TimestampNanos => write!(f, "timestamp_nanos"),
1907            PrimitiveType::Decimal(dtype) => {
1908                write!(f, "decimal({},{})", dtype.precision(), dtype.scale())
1909            }
1910        }
1911    }
1912}
1913
1914#[derive(Debug, Serialize, PartialEq, Clone, Eq)]
1915#[serde(untagged, rename_all = "camelCase")]
1916pub enum DataType {
1917    /// UTF-8 encoded string of characters
1918    Primitive(PrimitiveType),
1919    /// An array stores a variable length collection of items of some type.
1920    Array(Box<ArrayType>),
1921    /// A struct is used to represent both the top-level schema of the table as well
1922    /// as struct columns that contain nested columns.
1923    Struct(Box<StructType>),
1924    /// A map stores an arbitrary length collection of key-value pairs
1925    /// with a single keyType and a single valueType
1926    Map(Box<MapType>),
1927    /// The Variant data type. The physical representation can be flexible to support shredded
1928    /// reads. The unshredded schema is `Variant(StructType<metadata: BINARY, value: BINARY>)`.
1929    #[serde(serialize_with = "serialize_variant")]
1930    Variant(Box<StructType>),
1931}
1932
1933impl From<DecimalType> for PrimitiveType {
1934    fn from(dtype: DecimalType) -> Self {
1935        PrimitiveType::Decimal(dtype)
1936    }
1937}
1938impl From<DecimalType> for DataType {
1939    fn from(dtype: DecimalType) -> Self {
1940        PrimitiveType::from(dtype).into()
1941    }
1942}
1943impl From<PrimitiveType> for DataType {
1944    fn from(ptype: PrimitiveType) -> Self {
1945        DataType::Primitive(ptype)
1946    }
1947}
1948impl From<MapType> for DataType {
1949    fn from(map_type: MapType) -> Self {
1950        DataType::Map(Box::new(map_type))
1951    }
1952}
1953
1954impl From<StructType> for DataType {
1955    fn from(struct_type: StructType) -> Self {
1956        DataType::Struct(Box::new(struct_type))
1957    }
1958}
1959
1960impl From<ArrayType> for DataType {
1961    fn from(array_type: ArrayType) -> Self {
1962        DataType::Array(Box::new(array_type))
1963    }
1964}
1965
1966impl From<SchemaRef> for DataType {
1967    fn from(schema: SchemaRef) -> Self {
1968        Arc::unwrap_or_clone(schema).into()
1969    }
1970}
1971
1972// Custom Deserialize to preserve error messages from PrimitiveType.
1973// Serde's untagged enum only reports the last variant's error, discarding PrimitiveType's
1974// clear "Unsupported Delta table type: 'X'" message. We deserialize to Value first, then
1975// dispatch based on structure (string -> Primitive/Variant, object -> Array/Struct/Map).
1976impl<'de> serde::Deserialize<'de> for DataType {
1977    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1978    where
1979        D: serde::Deserializer<'de>,
1980    {
1981        use serde::de::Error;
1982        use serde_json::Value;
1983
1984        let value = Value::deserialize(deserializer)?;
1985
1986        // String values are either primitive types or "variant"
1987        if let Value::String(s) = &value {
1988            if s == "variant" {
1989                return match DataType::unshredded_variant() {
1990                    DataType::Variant(st) => Ok(DataType::Variant(st)),
1991                    _ => Err(Error::custom("Failed to create variant type")),
1992                };
1993            }
1994
1995            // Try PrimitiveType - this will give us good error messages for unsupported types
1996            return PrimitiveType::deserialize(value.clone())
1997                .map(DataType::Primitive)
1998                .map_err(|e| Error::custom(e.to_string()));
1999        }
2000
2001        // Object values are complex types - dispatch based on "type" field
2002        if let Value::Object(map) = &value {
2003            if let Some(Value::String(type_str)) = map.get("type") {
2004                return match type_str.as_str() {
2005                    "array" => ArrayType::deserialize(value)
2006                        .map(|at| DataType::Array(Box::new(at)))
2007                        .map_err(|e| Error::custom(e.to_string())),
2008                    "struct" => StructType::deserialize(value)
2009                        .map(|st| DataType::Struct(Box::new(st)))
2010                        .map_err(|e| Error::custom(e.to_string())),
2011                    "map" => MapType::deserialize(value)
2012                        .map(|mt| DataType::Map(Box::new(mt)))
2013                        .map_err(|e| Error::custom(e.to_string())),
2014                    _ => Err(Error::custom(format!("Unknown complex type: '{type_str}'"))),
2015                };
2016            }
2017        }
2018
2019        // Fallback error with the actual value that failed
2020        Err(Error::custom(format!(
2021            "Invalid data type: {}",
2022            serde_json::to_string(&value).unwrap_or_else(|_| format!("{value:?}"))
2023        )))
2024    }
2025}
2026
2027/// cbindgen:ignore
2028impl DataType {
2029    pub const STRING: Self = DataType::Primitive(PrimitiveType::String);
2030    pub const LONG: Self = DataType::Primitive(PrimitiveType::Long);
2031    pub const INTEGER: Self = DataType::Primitive(PrimitiveType::Integer);
2032    pub const SHORT: Self = DataType::Primitive(PrimitiveType::Short);
2033    pub const BYTE: Self = DataType::Primitive(PrimitiveType::Byte);
2034    pub const FLOAT: Self = DataType::Primitive(PrimitiveType::Float);
2035    pub const DOUBLE: Self = DataType::Primitive(PrimitiveType::Double);
2036    pub const BOOLEAN: Self = DataType::Primitive(PrimitiveType::Boolean);
2037    pub const BINARY: Self = DataType::Primitive(PrimitiveType::Binary);
2038    pub const DATE: Self = DataType::Primitive(PrimitiveType::Date);
2039    pub const TIMESTAMP: Self = DataType::Primitive(PrimitiveType::Timestamp);
2040    pub const TIMESTAMP_NTZ: Self = DataType::Primitive(PrimitiveType::TimestampNtz);
2041    #[cfg(feature = "nanosecond-timestamps")]
2042    pub const TIMESTAMP_NANOS: Self = DataType::Primitive(PrimitiveType::TimestampNanos);
2043    /// Create a new decimal type with the given precision and scale.
2044    pub fn decimal(precision: u8, scale: u8) -> DeltaResult<Self> {
2045        Ok(PrimitiveType::decimal(precision, scale)?.into())
2046    }
2047
2048    /// Create a new struct type with the given fields.
2049    pub fn try_struct_type(fields: impl IntoIterator<Item = StructField>) -> DeltaResult<Self> {
2050        Ok(StructType::try_new(fields)?.into())
2051    }
2052
2053    /// Create a new struct type from a fallible iterator of fields.
2054    pub fn try_struct_type_from_results<E: Into<Error>>(
2055        fields: impl IntoIterator<Item = Result<StructField, E>>,
2056    ) -> DeltaResult<Self> {
2057        StructType::try_from_results(fields).map(Self::from)
2058    }
2059
2060    /// Create a new struct type with the given fields without validating them.
2061    pub(crate) fn struct_type_unchecked(fields: impl IntoIterator<Item = StructField>) -> Self {
2062        StructType::new_unchecked(fields).into()
2063    }
2064
2065    /// Create a new unshredded [`DataType::Variant`]. This data type is a struct of two not-null
2066    /// binary fields: `metadata` and `value`.
2067    pub fn unshredded_variant() -> Self {
2068        DataType::Variant(Box::new(StructType::new_unchecked([
2069            StructField::not_null("metadata", DataType::BINARY),
2070            StructField::not_null("value", DataType::BINARY),
2071        ])))
2072    }
2073
2074    /// Create a new [`DataType::Variant`] from the provided fields. For unshredded variants, you
2075    /// should prefer using [`DataType::unshredded_variant`].
2076    pub fn variant_type(fields: impl IntoIterator<Item = StructField>) -> DeltaResult<Self> {
2077        // Different from regular StructTypes, Variants are not allowed to contain metadata columns
2078        // at all, so we also need to check their top-level primitive types.
2079        Ok(DataType::Variant(Box::new(StructType::try_from_results(
2080            fields.into_iter().map(|field| {
2081                if field.is_metadata_column() {
2082                    Err(Error::schema(
2083                        "Metadata columns are not allowed in Variant types".to_string(),
2084                    ))
2085                } else {
2086                    Ok(field)
2087                }
2088            }),
2089        )?)))
2090    }
2091
2092    /// Attempt to convert this data type to a [`PrimitiveType`]. Returns `None` if this is a
2093    /// non-primitive type.
2094    pub fn as_primitive_opt(&self) -> Option<&PrimitiveType> {
2095        match self {
2096            DataType::Primitive(ptype) => Some(ptype),
2097            _ => None,
2098        }
2099    }
2100
2101    fn fmt_recursive(&self, f: &mut Formatter<'_>, levels: &mut Vec<bool>) -> std::fmt::Result {
2102        match self {
2103            DataType::Struct(inner) => write_struct_type(inner, f, levels),
2104
2105            DataType::Array(inner) => {
2106                levels.push(true); // only one child → last
2107                write_indent(f, levels)?;
2108                writeln!(f, "array_element: {}", inner.element_type)?;
2109                inner.element_type.fmt_recursive(f, levels)?;
2110                levels.pop();
2111                Ok(())
2112            }
2113
2114            DataType::Map(inner) => {
2115                // key
2116                levels.push(false); // map_key is NOT last
2117                write_indent(f, levels)?;
2118                writeln!(f, "map_key: {}", inner.key_type)?;
2119                inner.key_type.fmt_recursive(f, levels)?;
2120                levels.pop();
2121
2122                // value
2123                levels.push(true); // map_value IS last at this level
2124                write_indent(f, levels)?;
2125                writeln!(f, "map_value: {}", inner.value_type)?;
2126                inner.value_type.fmt_recursive(f, levels)?;
2127                levels.pop();
2128                Ok(())
2129            }
2130
2131            _ => Ok(()),
2132        }
2133    }
2134}
2135
2136impl Display for DataType {
2137    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2138        match self {
2139            DataType::Primitive(p) => write!(f, "{p}"),
2140            DataType::Array(a) => write!(f, "array<{}>", a.element_type),
2141            DataType::Struct(s) => {
2142                write!(f, "struct<")?;
2143                for (i, field) in s.fields().enumerate() {
2144                    if i > 0 {
2145                        write!(f, ", ")?;
2146                    }
2147                    write!(f, "{}: {}", field.name, field.data_type)?;
2148                }
2149                write!(f, ">")
2150            }
2151            DataType::Map(m) => write!(f, "map<{}, {}>", m.key_type, m.value_type),
2152            DataType::Variant(_) => write!(f, "variant"),
2153        }
2154    }
2155}
2156
2157struct GetSchemaLeaves {
2158    path: Vec<String>,
2159    names: Vec<ColumnName>,
2160    types: Vec<DataType>,
2161}
2162impl GetSchemaLeaves {
2163    fn new(own_name: Option<&str>) -> Self {
2164        Self {
2165            path: own_name.into_iter().map(|s| s.to_string()).collect(),
2166            names: vec![],
2167            types: vec![],
2168        }
2169    }
2170}
2171
2172impl<'a> SchemaTransform<'a> for GetSchemaLeaves {
2173    transform_output_type!(|'a, T| ());
2174
2175    fn transform_struct_field(&mut self, field: &'a StructField) {
2176        self.path.push(field.name.clone());
2177        if let DataType::Struct(_) = field.data_type {
2178            self.recurse_into_struct_field(field);
2179        } else {
2180            self.names.push(ColumnName::new(&self.path));
2181            self.types.push(field.data_type.clone());
2182        }
2183        self.path.pop();
2184    }
2185}
2186
2187struct MakePhysical<'a> {
2188    column_mapping_mode: ColumnMappingMode,
2189    /// Logical path of current field's parent, used for error messages.
2190    logical_path: Vec<&'a str>,
2191    /// `delta.columnMapping.id` -> first claimer logical name.
2192    seen_ids: HashMap<i64, &'a str>,
2193    /// Stack of sibling-`physicalName` maps. The top of the stack holds the current field's
2194    /// siblings: key is the sibling's physical name, value is its logical name. Frames are
2195    /// pushed in `transform_struct` (root struct included) and popped after iterating its
2196    /// fields. Only structs introduce siblings; arrays/maps don't push frames since their
2197    /// elements / keys / values are anonymous.
2198    sibling_names_stack: Vec<HashMap<&'a str, &'a str>>,
2199}
2200impl<'a> MakePhysical<'a> {
2201    fn new(column_mapping_mode: ColumnMappingMode) -> Self {
2202        Self {
2203            column_mapping_mode,
2204            logical_path: vec![],
2205            seen_ids: HashMap::new(),
2206            sibling_names_stack: vec![],
2207        }
2208    }
2209
2210    fn transform_inner<T>(
2211        &mut self,
2212        logical_name: &'a str,
2213        transform: impl FnOnce(&mut Self) -> DeltaResult<T>,
2214    ) -> DeltaResult<T> {
2215        self.logical_path.push(logical_name);
2216        let result = transform(self);
2217        self.logical_path.pop();
2218        result
2219    }
2220}
2221impl<'a> SchemaTransform<'a> for MakePhysical<'a> {
2222    transform_output_type!(|'a, T| DeltaResult<Cow<'a, T>>);
2223
2224    fn transform_struct(&mut self, stype: &'a StructType) -> DeltaResult<Cow<'a, StructType>> {
2225        self.sibling_names_stack.push(HashMap::new());
2226        let result = self.recurse_into_struct(stype);
2227        self.sibling_names_stack.pop();
2228        result
2229    }
2230
2231    fn transform_array_element(&mut self, etype: &'a DataType) -> DeltaResult<Cow<'a, DataType>> {
2232        self.transform_inner("<array element>", |this| this.transform(etype))
2233    }
2234    fn transform_map_key(&mut self, ktype: &'a DataType) -> DeltaResult<Cow<'a, DataType>> {
2235        self.transform_inner("<map key>", |this| this.transform(ktype))
2236    }
2237    fn transform_map_value(&mut self, vtype: &'a DataType) -> DeltaResult<Cow<'a, DataType>> {
2238        self.transform_inner("<map value>", |this| this.transform(vtype))
2239    }
2240    fn transform_struct_field(
2241        &mut self,
2242        field: &'a StructField,
2243    ) -> DeltaResult<Cow<'a, StructField>> {
2244        let (physical_name, _id) = validate_and_extract_column_mapping_annotations(
2245            field,
2246            self.column_mapping_mode,
2247            &self.logical_path,
2248            Some(&mut self.seen_ids),
2249            self.sibling_names_stack.last_mut(),
2250        )?;
2251
2252        if field.is_metadata_column() {
2253            return Ok(Cow::Borrowed(field));
2254        }
2255
2256        self.transform_inner(field.name(), |this| {
2257            let field = this.recurse_into_struct_field(field)?;
2258
2259            let metadata = field.logical_to_physical_metadata(this.column_mapping_mode);
2260            let name = physical_name.to_owned();
2261
2262            Ok(Cow::Owned(field.with_name(name).with_metadata(metadata)))
2263        })
2264    }
2265
2266    fn transform_variant(&mut self, stype: &'a StructType) -> DeltaResult<Cow<'a, StructType>> {
2267        // There is no column mapping metadata inside the struct fields of a variant, so
2268        // we do not recurse into the variant fields
2269        Ok(Cow::Borrowed(stype))
2270    }
2271}
2272
2273#[cfg(test)]
2274mod tests {
2275    use rstest::rstest;
2276    use serde_json;
2277
2278    use super::*;
2279    use crate::table_features::ColumnMappingMode;
2280    use crate::utils::test_utils::{
2281        assert_result_error_with_message, column_mapping_physical_name_dedup_fixtures as fixtures,
2282        test_deep_nested_schema_missing_leaf_cm,
2283    };
2284
2285    fn example_schema_metadata() -> &'static str {
2286        r#"
2287            {
2288                "name": "e",
2289                "type": {
2290                    "type": "array",
2291                    "elementType": {
2292                        "type": "struct",
2293                        "fields": [
2294                            {
2295                                "name": "d",
2296                                "type": "integer",
2297                                "nullable": false,
2298                                "metadata": {
2299                                    "delta.columnMapping.id": 5,
2300                                    "delta.columnMapping.physicalName": "col-a7f4159c-53be-4cb0-b81a-f7e5240cfc49"
2301                                }
2302                            }
2303                        ]
2304                    },
2305                    "containsNull": true
2306                },
2307                "nullable": true,
2308                "metadata": {
2309                    "delta.columnMapping.id": 4,
2310                    "delta.columnMapping.physicalName": "col-5f422f40-de70-45b2-88ab-1d5c90e94db1",
2311                    "delta.identity.start": 2147483648
2312                }
2313            }"#
2314    }
2315
2316    #[test]
2317    fn test_serde_data_types() {
2318        let data = r#"
2319        {
2320            "name": "a",
2321            "type": "integer",
2322            "nullable": false,
2323            "metadata": {}
2324        }
2325        "#;
2326        let field: StructField = serde_json::from_str(data).unwrap();
2327        assert!(matches!(field.data_type, DataType::INTEGER));
2328
2329        let data = r#"
2330        {
2331            "name": "c",
2332            "type": {
2333                "type": "array",
2334                "elementType": "integer",
2335                "containsNull": false
2336            },
2337            "nullable": true,
2338            "metadata": {}
2339        }
2340        "#;
2341        let field: StructField = serde_json::from_str(data).unwrap();
2342        assert!(matches!(field.data_type, DataType::Array(_)));
2343
2344        let data = r#"
2345        {
2346            "name": "e",
2347            "type": {
2348                "type": "array",
2349                "elementType": {
2350                    "type": "struct",
2351                    "fields": [
2352                        {
2353                            "name": "d",
2354                            "type": "integer",
2355                            "nullable": false,
2356                            "metadata": {}
2357                        }
2358                    ]
2359                },
2360                "containsNull": true
2361            },
2362            "nullable": true,
2363            "metadata": {}
2364        }
2365        "#;
2366        let field: StructField = serde_json::from_str(data).unwrap();
2367        assert!(matches!(field.data_type, DataType::Array(_)));
2368        match field.data_type {
2369            DataType::Array(array) => assert!(matches!(array.element_type, DataType::Struct(_))),
2370            _ => unreachable!(),
2371        }
2372
2373        let data = r#"
2374        {
2375            "name": "f",
2376            "type": {
2377                "type": "map",
2378                "keyType": "string",
2379                "valueType": "string",
2380                "valueContainsNull": true
2381            },
2382            "nullable": true,
2383            "metadata": {}
2384        }
2385        "#;
2386        let field: StructField = serde_json::from_str(data).unwrap();
2387        assert!(matches!(field.data_type, DataType::Map(_)));
2388    }
2389
2390    #[test]
2391    fn test_roundtrip_decimal() {
2392        let data = r#"
2393        {
2394            "name": "a",
2395            "type": "decimal(10, 2)",
2396            "nullable": false,
2397            "metadata": {}
2398        }
2399        "#;
2400        let field: StructField = serde_json::from_str(data).unwrap();
2401        assert_eq!(field.data_type, DataType::decimal(10, 2).unwrap());
2402
2403        let json_str = serde_json::to_string(&field).unwrap();
2404        assert_eq!(
2405            json_str,
2406            r#"{"name":"a","type":"decimal(10,2)","nullable":false,"metadata":{}}"#
2407        );
2408    }
2409
2410    #[test]
2411    fn test_roundtrip_variant() {
2412        let data = r#"
2413        {
2414            "name": "v",
2415            "type": "variant",
2416            "nullable": false,
2417            "metadata": {}
2418        }
2419        "#;
2420        let field: StructField = serde_json::from_str(data).unwrap();
2421        assert_eq!(field.data_type, DataType::unshredded_variant());
2422
2423        let json_str = serde_json::to_string(&field).unwrap();
2424        assert_eq!(
2425            json_str,
2426            r#"{"name":"v","type":"variant","nullable":false,"metadata":{}}"#
2427        );
2428    }
2429
2430    #[test]
2431    fn test_unshredded_variant() {
2432        let unshredded_variant_type = DataType::unshredded_variant();
2433
2434        match &unshredded_variant_type {
2435            DataType::Variant(struct_type) => {
2436                let fields: Vec<_> = struct_type.fields().collect();
2437                assert_eq!(fields.len(), 2);
2438
2439                assert_eq!(fields[0].name, "metadata");
2440                assert_eq!(fields[0].data_type, DataType::BINARY);
2441                assert!(!fields[0].nullable);
2442
2443                assert_eq!(fields[1].name, "value");
2444                assert_eq!(fields[1].data_type, DataType::BINARY);
2445                assert!(!fields[1].nullable);
2446            }
2447            _ => panic!("Expected DataType::Variant, got {unshredded_variant_type:?}"),
2448        }
2449    }
2450
2451    #[rstest]
2452    #[case("interval second")]
2453    #[case("interval day")]
2454    #[case("money")]
2455    fn test_unsupported_type_error_message(#[case] unsupported_type: &str) {
2456        let data = format!(
2457            r#"{{
2458                "name": "test_field",
2459                "type": "{unsupported_type}",
2460                "nullable": false,
2461                "metadata": {{}}
2462            }}"#
2463        );
2464        let result: Result<StructField, _> = serde_json::from_str(&data);
2465        assert!(result.is_err());
2466        let err = result.unwrap_err();
2467        let expected_msg = format!("Unsupported Delta table type: '{unsupported_type}'");
2468        assert!(
2469            err.to_string().contains(&expected_msg),
2470            "Expected error message about unsupported type '{unsupported_type}', got: {err}"
2471        );
2472    }
2473
2474    #[rstest]
2475    #[case("string", DataType::STRING)]
2476    #[case("long", DataType::LONG)]
2477    #[case("integer", DataType::INTEGER)]
2478    #[case("short", DataType::SHORT)]
2479    #[case("byte", DataType::BYTE)]
2480    #[case("float", DataType::FLOAT)]
2481    #[case("double", DataType::DOUBLE)]
2482    #[case("boolean", DataType::BOOLEAN)]
2483    #[case("binary", DataType::BINARY)]
2484    #[case("date", DataType::DATE)]
2485    #[case("timestamp", DataType::TIMESTAMP)]
2486    #[case("timestamp_ntz", DataType::TIMESTAMP_NTZ)]
2487    fn test_primitive_type_deserialization_still_works(
2488        #[case] type_str: &str,
2489        #[case] expected_type: DataType,
2490    ) {
2491        let data = format!(
2492            r#"{{
2493                "name": "test_field",
2494                "type": "{type_str}",
2495                "nullable": false,
2496                "metadata": {{}}
2497            }}"#
2498        );
2499        let field: StructField = serde_json::from_str(&data).unwrap();
2500        assert_eq!(field.data_type, expected_type);
2501    }
2502
2503    #[rstest]
2504    #[case(10, 2)]
2505    #[case(16, 4)]
2506    #[case(38, 10)]
2507    fn test_decimal_with_primitive_deserializer(#[case] precision: u8, #[case] scale: u8) {
2508        let data = format!(
2509            r#"{{
2510                "name": "test_decimal",
2511                "type": "decimal({precision},{scale})",
2512                "nullable": false,
2513                "metadata": {{}}
2514            }}"#
2515        );
2516        let field: StructField = serde_json::from_str(&data).unwrap();
2517        assert_eq!(
2518            field.data_type,
2519            DataType::decimal(precision, scale).unwrap()
2520        );
2521    }
2522
2523    #[rstest]
2524    #[case("decimal(invalid)", "Invalid precision in decimal")]
2525    #[case("decimal(10)", "Invalid scale in decimal")]
2526    #[case("decimal()", "Invalid precision in decimal")]
2527    #[case("decimal(10,2,99)", "Invalid decimal format (expected 2 parts)")]
2528    fn test_invalid_decimal_format(#[case] invalid_type: &str, #[case] expected_error: &str) {
2529        let data = format!(
2530            r#"{{
2531                "name": "invalid",
2532                "type": "{invalid_type}",
2533                "nullable": false,
2534                "metadata": {{}}
2535            }}"#
2536        );
2537        let result: Result<StructField, _> = serde_json::from_str(&data);
2538        assert!(result.is_err());
2539        let err = result.unwrap_err();
2540        assert!(
2541            err.to_string().contains(expected_error),
2542            "Expected error containing '{expected_error}', got: {err}"
2543        );
2544    }
2545
2546    #[rstest]
2547    #[case(
2548        r#"{"type": "array", "elementType": "integer", "containsNull": false}"#,
2549        DataType::Array(Box::new(ArrayType::new(DataType::INTEGER, false)))
2550    )]
2551    #[case(
2552        r#"{"type": "struct", "fields": [{"name": "a", "type": "integer", "nullable": false, "metadata": {}}, {"name": "b", "type": "string", "nullable": true, "metadata": {}}]}"#,
2553        DataType::Struct(Box::new(StructType::new_unchecked([
2554            StructField::new("a", DataType::INTEGER, false),
2555            StructField::new("b", DataType::STRING, true),
2556        ])))
2557    )]
2558    #[case(
2559        r#"{"type": "map", "keyType": "string", "valueType": "integer", "valueContainsNull": true}"#,
2560        DataType::Map(Box::new(MapType::new(DataType::STRING, DataType::INTEGER, true)))
2561    )]
2562    #[case("\"string\"", DataType::STRING)]
2563    #[case("\"long\"", DataType::LONG)]
2564    #[case("\"integer\"", DataType::INTEGER)]
2565    #[case("\"short\"", DataType::SHORT)]
2566    #[case("\"byte\"", DataType::BYTE)]
2567    #[case("\"float\"", DataType::FLOAT)]
2568    #[case("\"double\"", DataType::DOUBLE)]
2569    #[case("\"boolean\"", DataType::BOOLEAN)]
2570    #[case("\"binary\"", DataType::BINARY)]
2571    #[case("\"date\"", DataType::DATE)]
2572    #[case("\"timestamp\"", DataType::TIMESTAMP)]
2573    #[case("\"timestamp_ntz\"", DataType::TIMESTAMP_NTZ)]
2574    #[case("\"variant\"", DataType::unshredded_variant())]
2575    fn test_data_type_deserialization(#[case] type_json: &str, #[case] expected: DataType) {
2576        let data_type: DataType = serde_json::from_str(type_json).unwrap();
2577        assert_eq!(data_type, expected);
2578    }
2579
2580    #[test]
2581    fn test_make_physical_no_column_mapping() {
2582        let field = StructField::nullable(
2583            "e",
2584            ArrayType::new(
2585                StructType::new_unchecked([StructField::not_null("d", DataType::INTEGER)]).into(),
2586                true,
2587            ),
2588        );
2589        let physical_field = field.make_physical(ColumnMappingMode::None).unwrap();
2590
2591        assert_eq!(physical_field.name, "e");
2592        assert!(physical_field
2593            .get_config_value(&ColumnMetadataKey::ColumnMappingId)
2594            .is_none());
2595        assert!(physical_field
2596            .get_config_value(&ColumnMetadataKey::ColumnMappingPhysicalName)
2597            .is_none());
2598
2599        let DataType::Array(atype) = physical_field.data_type else {
2600            panic!("Expected an Array");
2601        };
2602        let DataType::Struct(stype) = atype.element_type else {
2603            panic!("Expected a Struct");
2604        };
2605        let struct_field = stype.fields.get_index(0).unwrap().1;
2606        assert_eq!(struct_field.name, "d");
2607    }
2608
2609    #[test]
2610    fn test_make_physical_rejects_annotated_fields_when_column_mapping_disabled() {
2611        let data = example_schema_metadata();
2612        let field: StructField = serde_json::from_str(data).unwrap();
2613        assert!(field.make_physical(ColumnMappingMode::None).is_err());
2614    }
2615
2616    #[test]
2617    fn test_make_physical_rejects_unannotated_leaf_in_deep_nesting() {
2618        let schema = test_deep_nested_schema_missing_leaf_cm();
2619        let field = schema.fields().next().unwrap();
2620        let err = field
2621            .make_physical(ColumnMappingMode::Name)
2622            .unwrap_err()
2623            .to_string();
2624        assert!(
2625            err.contains("top.`<array element>`.mid_field.`<map value>`.leaf"),
2626            "Expected full nested path in error, got: {err}"
2627        );
2628    }
2629
2630    #[test]
2631    fn test_make_physical_rejects_duplicate_column_mapping_ids() {
2632        use crate::schema::ColumnMetadataKey;
2633
2634        fn cm_field(name: &str, id: i64, data_type: impl Into<DataType>) -> StructField {
2635            StructField::not_null(name, data_type).with_metadata([
2636                (
2637                    ColumnMetadataKey::ColumnMappingId.as_ref(),
2638                    MetadataValue::Number(id),
2639                ),
2640                (
2641                    ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(),
2642                    MetadataValue::String(format!("col-{name}")),
2643                ),
2644            ])
2645        }
2646
2647        let inner = StructType::new_unchecked([
2648            cm_field("x", 3, DataType::INTEGER),
2649            cm_field("y", 4, DataType::STRING),
2650        ]);
2651        let schema = StructType::new_unchecked([
2652            cm_field("a", 1, DataType::INTEGER),
2653            cm_field(
2654                "b",
2655                2,
2656                ArrayType::new(DataType::Struct(Box::new(inner)), true),
2657            ),
2658            cm_field("c", 3, DataType::STRING),
2659        ]);
2660        assert_result_error_with_message(
2661            schema.make_physical(ColumnMappingMode::Id),
2662            "Duplicate column mapping ID",
2663        );
2664    }
2665
2666    #[rstest]
2667    #[case::accepted_same_phy_name_different_paths(fixtures::same_phy_name_different_paths(), /*expected_error_substring*/None)]
2668    #[case::rejected_deeply_nested_repeat_physical_paths(
2669        fixtures::deeply_nested_repeat_physical_paths(),
2670        Some({
2671            let (a, b) =
2672                fixtures::deeply_nested_collider_paths();
2673            format!("assigned to both '{a}' and '{b}'")
2674        }),
2675    )]
2676    #[case::multiple_physical_name_collisions_reports_first(
2677        fixtures::multiple_physical_name_collisions(),
2678        Some("'p' assigned to both 'a' and 'b'".to_string()),
2679    )]
2680    fn test_make_physical_dup_physical_name(
2681        #[case] schema: StructType,
2682        #[case] expected_error_substring: Option<String>,
2683    ) {
2684        // The same dedup rules should apply under both CM modes.
2685        for mode in [ColumnMappingMode::Name, ColumnMappingMode::Id] {
2686            let result = schema.make_physical(mode);
2687            match &expected_error_substring {
2688                None => {
2689                    result.expect("The input schema should be valid");
2690                }
2691                Some(substr) => {
2692                    assert_result_error_with_message(result.as_ref().map(|_| ()), substr);
2693                    if let Err(e) = &result {
2694                        assert!(
2695                            !e.to_string().contains("'q'"),
2696                            "walker must short-circuit on first collision under {mode:?}; got: {e}"
2697                        );
2698                    }
2699                }
2700            }
2701        }
2702    }
2703
2704    #[test]
2705    fn test_make_physical_column_mapping() {
2706        [ColumnMappingMode::Name, ColumnMappingMode::Id]
2707            .into_iter()
2708            .for_each(|mode| {
2709                let data = example_schema_metadata();
2710
2711                let field: StructField = serde_json::from_str(data).unwrap();
2712
2713                let col_id = field
2714                    .get_config_value(&ColumnMetadataKey::ColumnMappingId)
2715                    .unwrap();
2716                let id_start = field
2717                    .get_config_value(&ColumnMetadataKey::IdentityStart)
2718                    .unwrap();
2719                assert!(matches!(col_id, MetadataValue::Number(num) if *num == 4));
2720                assert!(matches!(id_start, MetadataValue::Number(num) if *num == 2147483648i64));
2721                assert_eq!(
2722                    field.physical_name(mode),
2723                    "col-5f422f40-de70-45b2-88ab-1d5c90e94db1"
2724                );
2725                let physical_field = field.make_physical(mode).unwrap();
2726
2727                // Parquet field id should only be present in id column mapping mode
2728                match mode {
2729                    ColumnMappingMode::Id => {
2730                        assert!(matches!(
2731                            physical_field.get_config_value(&ColumnMetadataKey::ParquetFieldId),
2732                            Some(MetadataValue::Number(4))
2733                        ));
2734
2735                        assert!(matches!(
2736                            physical_field.get_config_value(&ColumnMetadataKey::ColumnMappingId),
2737                            Some(MetadataValue::Number(4))
2738                        ));
2739                    }
2740                    ColumnMappingMode::Name => {
2741                        assert!(matches!(
2742                            physical_field.get_config_value(&ColumnMetadataKey::ParquetFieldId),
2743                            Some(MetadataValue::Number(4))
2744                        ));
2745                        assert!(matches!(
2746                            physical_field.get_config_value(&ColumnMetadataKey::ColumnMappingId),
2747                            Some(MetadataValue::Number(4))
2748                        ));
2749                    }
2750                    ColumnMappingMode::None => panic!("unexpected column mapping mode"),
2751                }
2752
2753                assert_eq!(
2754                    physical_field.name,
2755                    "col-5f422f40-de70-45b2-88ab-1d5c90e94db1"
2756                );
2757                let DataType::Array(atype) = physical_field.data_type else {
2758                    panic!("Expected an Array");
2759                };
2760                let DataType::Struct(stype) = atype.element_type else {
2761                    panic!("Expected a Struct");
2762                };
2763
2764                let struct_field = stype.fields.get_index(0).unwrap().1;
2765                assert_eq!(
2766                    struct_field.name,
2767                    "col-a7f4159c-53be-4cb0-b81a-f7e5240cfc49"
2768                );
2769
2770                // The subfield should also have ParquetFieldId present it column mapping id mode
2771                match mode {
2772                    ColumnMappingMode::Id => {
2773                        assert!(matches!(
2774                            struct_field.get_config_value(&ColumnMetadataKey::ParquetFieldId),
2775                            Some(MetadataValue::Number(5))
2776                        ));
2777                        assert!(matches!(
2778                            struct_field.get_config_value(&ColumnMetadataKey::ColumnMappingId),
2779                            Some(MetadataValue::Number(5))
2780                        ));
2781                    }
2782                    ColumnMappingMode::Name => {
2783                        assert!(matches!(
2784                            struct_field.get_config_value(&ColumnMetadataKey::ParquetFieldId),
2785                            Some(MetadataValue::Number(5))
2786                        ));
2787                        assert!(matches!(
2788                            struct_field.get_config_value(&ColumnMetadataKey::ColumnMappingId),
2789                            Some(MetadataValue::Number(5))
2790                        ));
2791                    }
2792                    ColumnMappingMode::None => panic!("unexpected column mapping mode"),
2793                }
2794            });
2795    }
2796
2797    #[test]
2798    fn test_make_physical_passes_metadata_column_through() {
2799        let field = StructField::create_metadata_column(
2800            "_metadata.row_index",
2801            MetadataColumnSpec::RowIndex,
2802        );
2803        for mode in [
2804            ColumnMappingMode::None,
2805            ColumnMappingMode::Name,
2806            ColumnMappingMode::Id,
2807        ] {
2808            let physical = field.make_physical(mode).unwrap();
2809            assert_eq!(physical.name(), "_metadata.row_index");
2810            assert!(physical.is_metadata_column());
2811        }
2812    }
2813
2814    #[test]
2815    fn test_make_physical_rejects_metadata_column_with_cm_annotations() {
2816        let field = StructField::create_metadata_column(
2817            "_metadata.row_index",
2818            MetadataColumnSpec::RowIndex,
2819        )
2820        .add_metadata([(
2821            ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(),
2822            MetadataValue::String("phys".to_string()),
2823        )]);
2824        assert_result_error_with_message(
2825            field.make_physical(ColumnMappingMode::Name),
2826            "must not have column mapping annotations",
2827        );
2828    }
2829
2830    #[test]
2831    fn test_read_schemas() {
2832        let file = std::fs::File::open("./tests/serde/schema.json").unwrap();
2833        let schema: Result<Schema, _> = serde_json::from_reader(file);
2834        assert!(schema.is_ok());
2835
2836        let file = std::fs::File::open("./tests/serde/checkpoint_schema.json").unwrap();
2837        let schema: Result<Schema, _> = serde_json::from_reader(file);
2838        assert!(schema.is_ok())
2839    }
2840
2841    #[test]
2842    fn test_invalid_decimal() {
2843        let data = r#"
2844        {
2845            "name": "a",
2846            "type": "decimal(39, 10)",
2847            "nullable": false,
2848            "metadata": {}
2849        }
2850        "#;
2851        assert!(serde_json::from_str::<StructField>(data).is_err());
2852
2853        let data = r#"
2854        {
2855            "name": "a",
2856            "type": "decimal(10, 39)",
2857            "nullable": false,
2858            "metadata": {}
2859        }
2860        "#;
2861        assert!(serde_json::from_str::<StructField>(data).is_err());
2862    }
2863
2864    #[test]
2865    fn test_metadata_value_to_string() {
2866        assert_eq!(MetadataValue::Number(0).to_string(), "0");
2867        assert_eq!(
2868            MetadataValue::String("hello".to_string()).to_string(),
2869            "hello"
2870        );
2871        assert_eq!(MetadataValue::Boolean(true).to_string(), "true");
2872        assert_eq!(MetadataValue::Boolean(false).to_string(), "false");
2873        let object_json = serde_json::json!({ "an": "object" });
2874        assert_eq!(
2875            MetadataValue::Other(object_json).to_string(),
2876            "{\"an\":\"object\"}"
2877        );
2878        let array_json = serde_json::json!(["an", "array"]);
2879        assert_eq!(
2880            MetadataValue::Other(array_json).to_string(),
2881            "[\"an\",\"array\"]"
2882        );
2883    }
2884
2885    #[test]
2886    fn test_num_fields() {
2887        let schema = StructType::new_unchecked([]);
2888        assert!(schema.num_fields() == 0);
2889        let schema = StructType::new_unchecked([
2890            StructField::nullable("a", DataType::LONG),
2891            StructField::nullable("b", DataType::LONG),
2892            StructField::nullable("c", DataType::LONG),
2893            StructField::nullable("d", DataType::LONG),
2894        ]);
2895        assert_eq!(schema.num_fields(), 4);
2896        let schema = StructType::new_unchecked([
2897            StructField::nullable("b", DataType::LONG),
2898            StructField::not_null("b", DataType::LONG),
2899            StructField::nullable("c", DataType::LONG),
2900            StructField::nullable("c", DataType::LONG),
2901        ]);
2902        assert_eq!(schema.num_fields(), 2);
2903    }
2904
2905    #[test]
2906    fn test_has_invariants() {
2907        // Schema with no invariants
2908        let schema = StructType::new_unchecked([
2909            StructField::nullable("a", DataType::STRING),
2910            StructField::nullable("b", DataType::INTEGER),
2911        ]);
2912        assert!(!schema_has_invariants(&schema));
2913
2914        // Schema with top-level invariant
2915        let mut field = StructField::nullable("c", DataType::STRING);
2916        field.metadata.insert(
2917            ColumnMetadataKey::Invariants.as_ref().to_string(),
2918            MetadataValue::String("c > 0".to_string()),
2919        );
2920
2921        let schema =
2922            StructType::new_unchecked([StructField::nullable("a", DataType::STRING), field]);
2923        assert!(schema_has_invariants(&schema));
2924
2925        // Schema with nested invariant in a struct
2926        let nested_field = StructField::nullable(
2927            "nested_c",
2928            DataType::try_struct_type([{
2929                let mut field = StructField::nullable("d", DataType::INTEGER);
2930                field.metadata.insert(
2931                    ColumnMetadataKey::Invariants.as_ref().to_string(),
2932                    MetadataValue::String("d > 0".to_string()),
2933                );
2934                field
2935            }])
2936            .unwrap(),
2937        );
2938
2939        let schema = StructType::new_unchecked([
2940            StructField::nullable("a", DataType::STRING),
2941            StructField::nullable("b", DataType::INTEGER),
2942            nested_field,
2943        ]);
2944        assert!(schema_has_invariants(&schema));
2945
2946        // Schema with nested invariant in an array of structs
2947        let array_field = StructField::nullable(
2948            "array_field",
2949            ArrayType::new(
2950                DataType::try_struct_type([{
2951                    let mut field = StructField::nullable("d", DataType::INTEGER);
2952                    field.metadata.insert(
2953                        ColumnMetadataKey::Invariants.as_ref().to_string(),
2954                        MetadataValue::String("d > 0".to_string()),
2955                    );
2956                    field
2957                }])
2958                .unwrap(),
2959                true,
2960            ),
2961        );
2962
2963        let schema = StructType::new_unchecked([
2964            StructField::nullable("a", DataType::STRING),
2965            StructField::nullable("b", DataType::INTEGER),
2966            array_field,
2967        ]);
2968        assert!(schema_has_invariants(&schema));
2969
2970        // Schema with nested invariant in a map value that's a struct
2971        let map_field = StructField::nullable(
2972            "map_field",
2973            MapType::new(
2974                DataType::STRING,
2975                DataType::try_struct_type([{
2976                    let mut field = StructField::nullable("d", DataType::INTEGER);
2977                    field.metadata.insert(
2978                        ColumnMetadataKey::Invariants.as_ref().to_string(),
2979                        MetadataValue::String("d > 0".to_string()),
2980                    );
2981                    field
2982                }])
2983                .unwrap(),
2984                true,
2985            ),
2986        );
2987
2988        let schema = StructType::new_unchecked([
2989            StructField::nullable("a", DataType::STRING),
2990            StructField::nullable("b", DataType::INTEGER),
2991            map_field,
2992        ]);
2993        assert!(schema_has_invariants(&schema));
2994    }
2995
2996    fn all_nullable_schema() -> StructType {
2997        StructType::new_unchecked([
2998            StructField::nullable("a", DataType::STRING),
2999            StructField::nullable("b", DataType::INTEGER),
3000        ])
3001    }
3002
3003    fn top_level_non_null_schema() -> StructType {
3004        StructType::new_unchecked([
3005            StructField::not_null("id", DataType::INTEGER),
3006            StructField::nullable("name", DataType::STRING),
3007        ])
3008    }
3009
3010    fn nested_non_null_schema() -> StructType {
3011        let nested_field = StructField::nullable(
3012            "parent",
3013            DataType::try_struct_type([StructField::not_null("child", DataType::INTEGER)]).unwrap(),
3014        );
3015        StructType::new_unchecked([StructField::nullable("a", DataType::STRING), nested_field])
3016    }
3017
3018    fn array_non_null_schema() -> StructType {
3019        let array_field = StructField::nullable(
3020            "arr",
3021            ArrayType::new(
3022                DataType::try_struct_type([StructField::not_null("child", DataType::INTEGER)])
3023                    .unwrap(),
3024                true,
3025            ),
3026        );
3027        StructType::new_unchecked([array_field])
3028    }
3029
3030    fn map_non_null_schema() -> StructType {
3031        let map_field = StructField::nullable(
3032            "map",
3033            MapType::new(
3034                DataType::STRING,
3035                DataType::try_struct_type([StructField::not_null("child", DataType::INTEGER)])
3036                    .unwrap(),
3037                true,
3038            ),
3039        );
3040        StructType::new_unchecked([map_field])
3041    }
3042
3043    fn variant_only_schema() -> StructType {
3044        // Variant internal fields (metadata, value) are protocol-defined non-null but
3045        // must NOT be counted as user-controlled non-null fields.
3046        StructType::new_unchecked([StructField::nullable("v", DataType::unshredded_variant())])
3047    }
3048
3049    #[rstest]
3050    #[case::all_nullable(all_nullable_schema(), false)]
3051    #[case::top_level(top_level_non_null_schema(), true)]
3052    #[case::nested_struct(nested_non_null_schema(), true)]
3053    #[case::array_element(array_non_null_schema(), true)]
3054    #[case::map_value(map_non_null_schema(), true)]
3055    #[case::variant_skipped(variant_only_schema(), false)]
3056    fn test_schema_contains_non_null_fields(#[case] schema: StructType, #[case] expected: bool) {
3057        assert_eq!(schema_contains_non_null_fields(&schema), expected);
3058    }
3059
3060    #[test]
3061    fn test_struct_type_iterator_basic() {
3062        let fields = vec![
3063            StructField::new("field1", DataType::STRING, true),
3064            StructField::new("field2", DataType::INTEGER, false),
3065            StructField::new("field3", DataType::BOOLEAN, true),
3066        ];
3067        let struct_type = StructType::new_unchecked(fields.clone());
3068
3069        // Test fields() method returns reference iterator
3070        let field_names: Vec<_> = struct_type.fields().map(|f| f.name()).collect();
3071        assert_eq!(field_names, vec!["field1", "field2", "field3"]);
3072
3073        // Test that we can still access the struct_type after using fields()
3074        assert_eq!(struct_type.field("field1").unwrap().name, "field1");
3075    }
3076
3077    #[test]
3078    fn test_struct_type_into_iterator_owned() {
3079        let fields = vec![
3080            StructField::new("a", DataType::STRING, true),
3081            StructField::new("b", DataType::INTEGER, false),
3082        ];
3083        let struct_type = StructType::new_unchecked(fields);
3084
3085        // Test owned iteration (consumes the struct)
3086        let mut field_names = Vec::new();
3087        for field in struct_type {
3088            field_names.push(field.name);
3089        }
3090        assert_eq!(field_names, vec!["a", "b"]);
3091    }
3092
3093    #[test]
3094    fn test_struct_type_into_iterator_references() {
3095        let fields = vec![
3096            StructField::new("x", DataType::DOUBLE, true),
3097            StructField::new("y", DataType::FLOAT, false),
3098            StructField::new("z", DataType::LONG, true),
3099        ];
3100        let struct_type = StructType::new_unchecked(fields);
3101
3102        // Test reference iteration (does not consume the struct)
3103        let mut field_names = Vec::new();
3104        for field in &struct_type {
3105            field_names.push(field.name().clone());
3106        }
3107        assert_eq!(field_names, vec!["x", "y", "z"]);
3108
3109        // Should still be able to use struct_type after iteration
3110        assert_eq!(struct_type.field("x").unwrap().name, "x");
3111    }
3112
3113    #[test]
3114    fn test_iterator_exact_size() {
3115        let fields = vec![
3116            StructField::new("field1", DataType::STRING, true),
3117            StructField::new("field2", DataType::INTEGER, false),
3118            StructField::new("field3", DataType::BOOLEAN, true),
3119            StructField::new("field4", DataType::DATE, true),
3120        ];
3121
3122        // Test ExactSizeIterator for reference iterator
3123        let struct_type = StructType::new_unchecked(fields.clone());
3124        let ref_iter = struct_type.fields();
3125        assert_eq!(ref_iter.len(), 4);
3126
3127        // Test ExactSizeIterator for &StructType into_iter
3128        let struct_type = StructType::new_unchecked(fields.clone());
3129        let into_ref_iter = (&struct_type).into_iter();
3130        assert_eq!(into_ref_iter.len(), 4);
3131
3132        // Test ExactSizeIterator for StructType into_iter (consuming)
3133        let struct_type = StructType::new_unchecked(fields);
3134        let into_owned_iter = struct_type.into_iter();
3135        assert_eq!(into_owned_iter.len(), 4);
3136    }
3137
3138    #[test]
3139    fn test_iterator_with_metadata() {
3140        let field_with_metadata = StructField::new("test_field", DataType::STRING, true)
3141            .with_metadata([("key1", MetadataValue::String("value1".to_string()))]);
3142
3143        let struct_type = StructType::new_unchecked([field_with_metadata]);
3144
3145        // Test that metadata is preserved through iteration
3146        for field in &struct_type {
3147            assert_eq!(field.metadata().len(), 1);
3148            assert_eq!(
3149                field.metadata().get("key1"),
3150                Some(&MetadataValue::String("value1".to_string()))
3151            );
3152        }
3153
3154        // Test consuming iterator preserves metadata
3155        for field in struct_type {
3156            assert_eq!(field.metadata().len(), 1);
3157            assert_eq!(
3158                field.metadata().get("key1"),
3159                Some(&MetadataValue::String("value1".to_string()))
3160            );
3161        }
3162    }
3163
3164    #[test]
3165    fn test_empty_struct_type_iterator() {
3166        let struct_type = StructType::new_unchecked(std::iter::empty::<StructField>());
3167
3168        // Test all iterator methods with empty struct
3169        assert_eq!(struct_type.fields().count(), 0);
3170        assert_eq!((&struct_type).into_iter().count(), 0);
3171        assert_eq!(struct_type.into_iter().count(), 0);
3172    }
3173
3174    #[test]
3175    fn test_iterator_order_preservation() {
3176        let fields = vec![
3177            StructField::new("zebra", DataType::STRING, true),
3178            StructField::new("apple", DataType::INTEGER, false),
3179            StructField::new("banana", DataType::BOOLEAN, true),
3180        ];
3181        let struct_type = StructType::new_unchecked(fields);
3182
3183        // IndexMap should preserve insertion order
3184        let field_names: Vec<_> = struct_type.fields().map(|f| f.name()).collect();
3185        assert_eq!(field_names, vec!["zebra", "apple", "banana"]);
3186
3187        // Order should be the same across different iterator methods
3188        let ref_names: Vec<_> = (&struct_type).into_iter().map(|f| f.name()).collect();
3189        assert_eq!(ref_names, vec!["zebra", "apple", "banana"]);
3190
3191        // Test consuming iterator maintains order too
3192        let owned_names: Vec<_> = struct_type.into_iter().map(|f| f.name).collect();
3193        assert_eq!(owned_names, vec!["zebra", "apple", "banana"]);
3194    }
3195
3196    #[test]
3197    fn test_iterator_collect() {
3198        let original_fields = vec![
3199            StructField::new("field1", DataType::STRING, true),
3200            StructField::new("field2", DataType::INTEGER, false),
3201        ];
3202        let struct_type = StructType::new_unchecked(original_fields.clone());
3203
3204        // Test collecting from reference iterator
3205        let collected_refs: Vec<&StructField> = struct_type.fields().collect();
3206        assert_eq!(collected_refs.len(), 2);
3207        assert_eq!(collected_refs[0].name, "field1");
3208        assert_eq!(collected_refs[1].name, "field2");
3209
3210        // Test collecting from consuming iterator
3211        let collected_owned: Vec<StructField> = struct_type.into_iter().collect();
3212        assert_eq!(collected_owned.len(), 2);
3213        assert_eq!(collected_owned[0].name, "field1");
3214        assert_eq!(collected_owned[1].name, "field2");
3215    }
3216
3217    #[test]
3218    fn test_iterator_functional_methods() {
3219        let fields = vec![
3220            StructField::new("nullable_string", DataType::STRING, true),
3221            StructField::new("required_int", DataType::INTEGER, false),
3222            StructField::new("nullable_bool", DataType::BOOLEAN, true),
3223            StructField::new("required_long", DataType::LONG, false),
3224        ];
3225        let struct_type = StructType::new_unchecked(fields);
3226
3227        // Test filter - find nullable fields
3228        let nullable_count = struct_type.fields().filter(|f| f.is_nullable()).count();
3229        assert_eq!(nullable_count, 2);
3230
3231        // Test map and filter chain
3232        let required_field_names: Vec<_> = struct_type
3233            .fields()
3234            .filter(|f| !f.is_nullable())
3235            .map(|f| f.name())
3236            .collect();
3237        assert_eq!(required_field_names, vec!["required_int", "required_long"]);
3238
3239        // Test enumerate
3240        for (index, field) in struct_type.fields().enumerate() {
3241            match index {
3242                0 => assert_eq!(field.name, "nullable_string"),
3243                1 => assert_eq!(field.name, "required_int"),
3244                2 => assert_eq!(field.name, "nullable_bool"),
3245                3 => assert_eq!(field.name, "required_long"),
3246                _ => panic!("Unexpected field index: {index}"),
3247            }
3248        }
3249    }
3250
3251    #[test]
3252    fn test_double_ended_iterator_ref() {
3253        let fields = vec![
3254            StructField::new("first", DataType::STRING, true),
3255            StructField::new("second", DataType::INTEGER, false),
3256            StructField::new("third", DataType::BOOLEAN, true),
3257            StructField::new("fourth", DataType::LONG, false),
3258        ];
3259        let struct_type = StructType::new_unchecked(fields);
3260
3261        // Test iterating from both ends using reference iterator
3262        let mut iter = struct_type.fields();
3263
3264        // Forward iteration
3265        assert_eq!(iter.next().unwrap().name, "first");
3266        assert_eq!(iter.next().unwrap().name, "second");
3267
3268        // Backward iteration
3269        assert_eq!(iter.next_back().unwrap().name, "fourth");
3270        assert_eq!(iter.next_back().unwrap().name, "third");
3271
3272        // Should be exhausted
3273        assert!(iter.next().is_none());
3274        assert!(iter.next_back().is_none());
3275    }
3276
3277    #[test]
3278    fn test_double_ended_iterator_owned() {
3279        let fields = vec![
3280            StructField::new("alpha", DataType::STRING, true),
3281            StructField::new("beta", DataType::INTEGER, false),
3282            StructField::new("gamma", DataType::BOOLEAN, true),
3283        ];
3284        let struct_type = StructType::new_unchecked(fields);
3285
3286        // Test iterating from both ends using owned iterator
3287        let mut iter = struct_type.into_iter();
3288
3289        // Backward iteration first
3290        assert_eq!(iter.next_back().unwrap().name, "gamma");
3291
3292        // Forward iteration
3293        assert_eq!(iter.next().unwrap().name, "alpha");
3294
3295        // Backward iteration again
3296        assert_eq!(iter.next_back().unwrap().name, "beta");
3297
3298        // Should be exhausted
3299        assert!(iter.next().is_none());
3300        assert!(iter.next_back().is_none());
3301    }
3302
3303    #[test]
3304    fn test_double_ended_iterator_collect_reverse() {
3305        let fields = vec![
3306            StructField::new("one", DataType::STRING, true),
3307            StructField::new("two", DataType::INTEGER, false),
3308            StructField::new("three", DataType::BOOLEAN, true),
3309        ];
3310        let struct_type = StructType::new_unchecked(fields);
3311
3312        // Test collecting in reverse order using DoubleEndedIterator
3313        let reversed_names: Vec<_> = struct_type.fields().rev().map(|f| f.name()).collect();
3314        assert_eq!(reversed_names, vec!["three", "two", "one"]);
3315
3316        // Test that we can still use the original struct
3317        assert_eq!(struct_type.field("two").unwrap().name, "two");
3318    }
3319
3320    #[test]
3321    fn test_double_ended_iterator_with_into_iter_ref() {
3322        let fields = vec![
3323            StructField::new("x", DataType::DOUBLE, true),
3324            StructField::new("y", DataType::FLOAT, false),
3325            StructField::new("z", DataType::LONG, true),
3326        ];
3327        let struct_type = StructType::new_unchecked(fields);
3328
3329        // Test DoubleEndedIterator with &StructType into_iter
3330        let mut iter = (&struct_type).into_iter();
3331
3332        // Mix forward and backward iteration
3333        assert_eq!(iter.next().unwrap().name, "x");
3334        assert_eq!(iter.next_back().unwrap().name, "z");
3335        assert_eq!(iter.next().unwrap().name, "y");
3336
3337        // Should be exhausted
3338        assert!(iter.next().is_none());
3339        assert!(iter.next_back().is_none());
3340    }
3341
3342    #[test]
3343    fn test_fused_iterator_ref() {
3344        let fields = vec![
3345            StructField::new("test1", DataType::STRING, true),
3346            StructField::new("test2", DataType::INTEGER, false),
3347        ];
3348        let struct_type = StructType::new_unchecked(fields);
3349
3350        // Verify that reference iterator implements FusedIterator
3351        let mut iter = struct_type.fields();
3352
3353        // Exhaust the iterator
3354        assert!(iter.next().is_some());
3355        assert!(iter.next().is_some());
3356        assert!(iter.next().is_none());
3357
3358        // FusedIterator guarantees that calling next() after exhaustion always returns None
3359        assert!(iter.next().is_none());
3360        assert!(iter.next().is_none());
3361        assert!(iter.next().is_none());
3362    }
3363
3364    #[test]
3365    fn test_fused_iterator_owned() {
3366        let fields = vec![
3367            StructField::new("item1", DataType::STRING, true),
3368            StructField::new("item2", DataType::INTEGER, false),
3369        ];
3370        let struct_type = StructType::new_unchecked(fields);
3371
3372        // Verify that owned iterator implements FusedIterator
3373        let mut iter = struct_type.into_iter();
3374
3375        // Exhaust the iterator
3376        assert!(iter.next().is_some());
3377        assert!(iter.next().is_some());
3378        assert!(iter.next().is_none());
3379
3380        // FusedIterator guarantees that calling next() after exhaustion always returns None
3381        assert!(iter.next().is_none());
3382        assert!(iter.next().is_none());
3383        assert!(iter.next().is_none());
3384    }
3385
3386    #[test]
3387    fn test_fused_iterator_with_into_iter_ref() {
3388        let fields = vec![StructField::new("field_a", DataType::BOOLEAN, true)];
3389        let struct_type = StructType::new_unchecked(fields);
3390
3391        // Verify that &StructType into_iter implements FusedIterator
3392        let mut iter = (&struct_type).into_iter();
3393
3394        // Exhaust the iterator
3395        assert!(iter.next().is_some());
3396        assert!(iter.next().is_none());
3397
3398        // FusedIterator guarantees that calling next() after exhaustion always returns None
3399        assert!(iter.next().is_none());
3400        assert!(iter.next().is_none());
3401    }
3402
3403    #[test]
3404    fn test_fused_double_ended_iterator_empty() {
3405        let struct_type = StructType::new_unchecked(std::iter::empty::<StructField>());
3406
3407        // Test both forward and backward iteration on empty iterator
3408        let mut iter = struct_type.fields();
3409
3410        // Empty iterator should return None immediately
3411        assert!(iter.next().is_none());
3412        assert!(iter.next_back().is_none());
3413
3414        // FusedIterator guarantees continued None after exhaustion
3415        assert!(iter.next().is_none());
3416        assert!(iter.next_back().is_none());
3417    }
3418
3419    #[test]
3420    fn test_double_ended_iterator_single_element() {
3421        let fields = vec![StructField::new("single", DataType::STRING, true)];
3422        let struct_type = StructType::new_unchecked(fields);
3423
3424        // Test DoubleEndedIterator with single element
3425        let mut iter = struct_type.fields();
3426
3427        // Should get the single element from next()
3428        assert_eq!(iter.next().unwrap().name, "single");
3429        assert!(iter.next().is_none());
3430        assert!(iter.next_back().is_none());
3431
3432        // Test getting single element from next_back()
3433        let struct_type =
3434            StructType::new_unchecked([StructField::new("single2", DataType::INTEGER, false)]);
3435        let mut iter = struct_type.into_iter();
3436
3437        assert_eq!(iter.next_back().unwrap().name, "single2");
3438        assert!(iter.next().is_none());
3439        assert!(iter.next_back().is_none());
3440    }
3441
3442    #[test]
3443    fn test_metadata_column_spec() -> DeltaResult<()> {
3444        // Test text_value
3445        assert_eq!(MetadataColumnSpec::RowIndex.text_value(), "row_index");
3446        assert_eq!(MetadataColumnSpec::RowId.text_value(), "row_id");
3447        assert_eq!(
3448            MetadataColumnSpec::RowCommitVersion.text_value(),
3449            "row_commit_version"
3450        );
3451        assert_eq!(MetadataColumnSpec::FilePath.text_value(), "_file");
3452
3453        // Test data_type
3454        assert_eq!(MetadataColumnSpec::RowIndex.data_type(), DataType::LONG);
3455        assert_eq!(MetadataColumnSpec::RowId.data_type(), DataType::LONG);
3456        assert_eq!(
3457            MetadataColumnSpec::RowCommitVersion.data_type(),
3458            DataType::LONG
3459        );
3460        assert_eq!(MetadataColumnSpec::FilePath.data_type(), DataType::STRING);
3461
3462        // Test nullable
3463        assert!(!MetadataColumnSpec::RowIndex.nullable());
3464        assert!(!MetadataColumnSpec::RowId.nullable());
3465        assert!(!MetadataColumnSpec::RowCommitVersion.nullable());
3466        assert!(!MetadataColumnSpec::FilePath.nullable());
3467
3468        // Test reserved_field_id
3469        assert_eq!(MetadataColumnSpec::RowIndex.reserved_field_id(), None);
3470        assert_eq!(MetadataColumnSpec::RowId.reserved_field_id(), None);
3471        assert_eq!(
3472            MetadataColumnSpec::RowCommitVersion.reserved_field_id(),
3473            None
3474        );
3475        assert_eq!(
3476            MetadataColumnSpec::FilePath.reserved_field_id(),
3477            Some(crate::reserved_field_ids::FILE_NAME)
3478        );
3479
3480        // Test from_str
3481        assert_eq!(
3482            MetadataColumnSpec::from_str("row_index")?,
3483            MetadataColumnSpec::RowIndex
3484        );
3485        assert_eq!(
3486            MetadataColumnSpec::from_str("row_id")?,
3487            MetadataColumnSpec::RowId
3488        );
3489        assert_eq!(
3490            MetadataColumnSpec::from_str("row_commit_version")?,
3491            MetadataColumnSpec::RowCommitVersion
3492        );
3493        assert_eq!(
3494            MetadataColumnSpec::from_str("_file")?,
3495            MetadataColumnSpec::FilePath
3496        );
3497
3498        // Test invalid from_str
3499        assert!(MetadataColumnSpec::from_str("invalid").is_err());
3500
3501        Ok(())
3502    }
3503
3504    #[test]
3505    fn test_create_metadata_column() {
3506        let field =
3507            StructField::create_metadata_column("test_row_index", MetadataColumnSpec::RowIndex);
3508
3509        assert_eq!(field.name(), "test_row_index");
3510        assert_eq!(field.data_type(), &DataType::LONG);
3511        assert!(!field.nullable);
3512        assert!(field.is_metadata_column());
3513        assert_eq!(
3514            field.get_metadata_column_spec(),
3515            Some(MetadataColumnSpec::RowIndex)
3516        );
3517    }
3518
3519    #[test]
3520    fn test_default_row_index_column() {
3521        let field = StructField::default_row_index_column();
3522
3523        assert_eq!(field.name(), "_metadata.row_index");
3524        assert_eq!(field.data_type(), &DataType::LONG);
3525        assert!(!field.nullable);
3526        assert!(field.is_metadata_column());
3527        assert_eq!(
3528            field.get_metadata_column_spec(),
3529            Some(MetadataColumnSpec::RowIndex)
3530        );
3531    }
3532
3533    #[test]
3534    fn test_add_column() -> DeltaResult<()> {
3535        let schema = StructType::try_new([StructField::nullable("col1", DataType::STRING)])?;
3536
3537        let new_field = StructField::nullable("col2", DataType::INTEGER);
3538        let updated_schema = schema.add([new_field])?;
3539
3540        assert_eq!(updated_schema.fields().count(), 2);
3541        assert!(updated_schema.contains("col1"));
3542        assert!(updated_schema.contains("col2"));
3543        Ok(())
3544    }
3545
3546    #[test]
3547    fn test_add_metadata_column() -> DeltaResult<()> {
3548        let schema = StructType::try_new([StructField::nullable("regular_col", DataType::STRING)])?;
3549
3550        let schema_with_metadata =
3551            schema.add_metadata_column("my_row_index", MetadataColumnSpec::RowIndex)?;
3552
3553        assert_eq!(schema_with_metadata.fields().count(), 2);
3554        assert!(schema_with_metadata.contains_metadata_column(&MetadataColumnSpec::RowIndex));
3555        assert!(schema_with_metadata.contains("my_row_index"));
3556        assert_eq!(
3557            schema_with_metadata.index_of_metadata_column(&MetadataColumnSpec::RowIndex),
3558            Some(&1)
3559        );
3560        Ok(())
3561    }
3562
3563    #[test]
3564    fn test_duplicate_metadata_columns() -> DeltaResult<()> {
3565        let schema = StructType::try_new([StructField::nullable("regular_col", DataType::STRING)])?;
3566
3567        let schema_with_metadata =
3568            schema.add_metadata_column("row_index1", MetadataColumnSpec::RowIndex)?;
3569
3570        // Adding another row index metadata column should fail
3571        let result =
3572            schema_with_metadata.add_metadata_column("row_index2", MetadataColumnSpec::RowIndex);
3573
3574        assert_result_error_with_message(result, "Duplicate metadata column");
3575        Ok(())
3576    }
3577
3578    #[test]
3579    fn test_duplicate_field_name_case_insensitive() {
3580        // Delta column names are case-insensitive per protocol; (Value, value) is invalid
3581        let result = StructType::try_new([
3582            StructField::nullable("Value", DataType::INTEGER),
3583            StructField::nullable("value", DataType::STRING),
3584        ]);
3585        assert_result_error_with_message(result, "Duplicate field name (case-insensitive)");
3586    }
3587
3588    #[test]
3589    fn test_duplicate_field_name_exact() {
3590        // Exact duplicate (same name twice) is rejected via the case-insensitive check
3591        let result = StructType::try_new([
3592            StructField::nullable("id", DataType::INTEGER),
3593            StructField::nullable("id", DataType::STRING),
3594        ]);
3595        assert_result_error_with_message(result, "Duplicate field name (case-insensitive)");
3596    }
3597
3598    #[test]
3599    fn test_nested_metadata_columns_validation_struct() -> DeltaResult<()> {
3600        // Test that metadata columns in nested structs are rejected
3601        let nested_field_with_metadata =
3602            StructField::create_metadata_column("nested_row_index", MetadataColumnSpec::RowIndex);
3603        let nested_struct = StructType {
3604            type_name: "struct".into(),
3605            fields: [(
3606                nested_field_with_metadata.name.clone(),
3607                nested_field_with_metadata,
3608            )]
3609            .into_iter()
3610            .collect(),
3611            metadata_columns: HashMap::new(),
3612        };
3613
3614        let result = StructType::try_new([
3615            StructField::nullable("regular_col", DataType::STRING),
3616            StructField::nullable("nested", DataType::Struct(Box::new(nested_struct))),
3617        ]);
3618
3619        assert_result_error_with_message(result, "only allowed at the top level");
3620        Ok(())
3621    }
3622
3623    #[test]
3624    fn test_nested_metadata_columns_validation_array() -> DeltaResult<()> {
3625        // Test that metadata columns in array element structs are rejected
3626        let nested_field_with_metadata =
3627            StructField::create_metadata_column("nested_row_index", MetadataColumnSpec::RowIndex);
3628        let nested_struct = StructType {
3629            type_name: "struct".into(),
3630            fields: [(
3631                nested_field_with_metadata.name.clone(),
3632                nested_field_with_metadata,
3633            )]
3634            .into_iter()
3635            .collect(),
3636            metadata_columns: HashMap::new(),
3637        };
3638        let array_type = ArrayType::new(DataType::Struct(Box::new(nested_struct)), true);
3639
3640        let result = StructType::try_new([
3641            StructField::nullable("regular_col", DataType::STRING),
3642            StructField::nullable("array_col", DataType::Array(Box::new(array_type))),
3643        ]);
3644
3645        assert_result_error_with_message(result, "only allowed at the top level");
3646        Ok(())
3647    }
3648
3649    #[test]
3650    fn test_nested_metadata_columns_validation_map() -> DeltaResult<()> {
3651        // Test that metadata columns in map key structs or map value structs are rejected
3652        let nested_field_with_metadata =
3653            StructField::create_metadata_column("nested_row_index", MetadataColumnSpec::RowIndex);
3654        let nested_struct = StructType {
3655            type_name: "struct".into(),
3656            fields: [(
3657                nested_field_with_metadata.name.clone(),
3658                nested_field_with_metadata,
3659            )]
3660            .into_iter()
3661            .collect(),
3662            metadata_columns: HashMap::new(),
3663        };
3664
3665        for map_type in [
3666            MapType::new(
3667                DataType::Struct(Box::new(nested_struct.clone())),
3668                DataType::STRING,
3669                true,
3670            ),
3671            MapType::new(
3672                DataType::STRING,
3673                DataType::Struct(Box::new(nested_struct)),
3674                true,
3675            ),
3676        ] {
3677            let result = StructType::try_new([
3678                StructField::nullable("regular_col", DataType::STRING),
3679                StructField::nullable("map_col", DataType::Map(Box::new(map_type))),
3680            ]);
3681
3682            assert_result_error_with_message(result, "only allowed at the top level");
3683        }
3684
3685        Ok(())
3686    }
3687
3688    #[test]
3689    fn test_column_identifier_trait() -> DeltaResult<()> {
3690        let schema = StructType::try_new([
3691            StructField::nullable("regular_col", DataType::STRING),
3692            StructField::create_metadata_column("row_index_col", MetadataColumnSpec::RowIndex),
3693        ])?;
3694
3695        // Test string identifier
3696        assert!(schema.contains("regular_col"));
3697        assert!(schema.contains("row_index_col"));
3698        assert!(!schema.contains("nonexistent"));
3699
3700        // Test String identifier
3701        assert!(schema.contains("regular_col"));
3702        assert!(schema.contains("row_index_col"));
3703
3704        // Test MetadataColumnSpec identifier
3705        assert!(schema.contains_metadata_column(&MetadataColumnSpec::RowIndex));
3706        assert!(!schema.contains_metadata_column(&MetadataColumnSpec::RowId));
3707        Ok(())
3708    }
3709
3710    #[test]
3711    fn test_metadata_column_serialization() -> DeltaResult<()> {
3712        let field = StructField::create_metadata_column("test_row_id", MetadataColumnSpec::RowId);
3713
3714        // Test that serialization works
3715        let json = serde_json::to_string(&field)?;
3716        let deserialized: StructField = serde_json::from_str(&json)?;
3717
3718        assert_eq!(deserialized.name(), field.name());
3719        assert_eq!(deserialized.data_type(), field.data_type());
3720        assert_eq!(deserialized.nullable, field.nullable);
3721        assert!(deserialized.is_metadata_column());
3722        assert_eq!(
3723            deserialized.get_metadata_column_spec(),
3724            Some(MetadataColumnSpec::RowId)
3725        );
3726        Ok(())
3727    }
3728
3729    #[test]
3730    fn test_all_metadata_column_specs() -> DeltaResult<()> {
3731        let schema = StructType::try_new([StructField::nullable("regular_col", DataType::STRING)])?;
3732
3733        let schema = schema
3734            .add_metadata_column("row_index", MetadataColumnSpec::RowIndex)?
3735            .add_metadata_column("row_id", MetadataColumnSpec::RowId)?
3736            .add_metadata_column("row_commit_version", MetadataColumnSpec::RowCommitVersion)?;
3737
3738        assert_eq!(schema.fields().count(), 4);
3739        assert!(schema.contains_metadata_column(&MetadataColumnSpec::RowIndex));
3740        assert!(schema.contains_metadata_column(&MetadataColumnSpec::RowId));
3741        assert!(schema.contains_metadata_column(&MetadataColumnSpec::RowCommitVersion));
3742
3743        assert_eq!(
3744            schema.index_of_metadata_column(&MetadataColumnSpec::RowIndex),
3745            Some(&1)
3746        );
3747        assert_eq!(
3748            schema.index_of_metadata_column(&MetadataColumnSpec::RowId),
3749            Some(&2)
3750        );
3751        assert_eq!(
3752            schema.index_of_metadata_column(&MetadataColumnSpec::RowCommitVersion),
3753            Some(&3)
3754        );
3755        Ok(())
3756    }
3757
3758    #[test]
3759    fn test_physical_name_with_mode_none() {
3760        let field_json = r#"{
3761            "name": "logical_name",
3762            "type": "string",
3763            "nullable": true,
3764            "metadata": {
3765                "delta.columnMapping.physicalName": "physical_name_col123"
3766            }
3767        }"#;
3768        let field: StructField = serde_json::from_str(field_json).unwrap();
3769
3770        // With ColumnMappingMode::None, should return logical name even though physical name exists
3771        assert_eq!(field.physical_name(ColumnMappingMode::None), "logical_name");
3772    }
3773
3774    #[test]
3775    fn test_physical_name_with_mode_id() {
3776        let field_json = r#"{
3777            "name": "logical_name",
3778            "type": "string",
3779            "nullable": true,
3780            "metadata": {
3781                "delta.columnMapping.id": 5,
3782                "delta.columnMapping.physicalName": "physical_name_col123"
3783            }
3784        }"#;
3785        let field: StructField = serde_json::from_str(field_json).unwrap();
3786
3787        // With ColumnMappingMode::Id, should return physical name
3788        assert_eq!(
3789            field.physical_name(ColumnMappingMode::Id),
3790            "physical_name_col123"
3791        );
3792    }
3793
3794    #[test]
3795    fn test_physical_name_with_mode_name() {
3796        let field_json = r#"{
3797            "name": "logical_name",
3798            "type": "string",
3799            "nullable": true,
3800            "metadata": {
3801                "delta.columnMapping.physicalName": "physical_name_col456"
3802            }
3803        }"#;
3804        let field: StructField = serde_json::from_str(field_json).unwrap();
3805
3806        // With ColumnMappingMode::Name, should return physical name
3807        assert_eq!(
3808            field.physical_name(ColumnMappingMode::Name),
3809            "physical_name_col456"
3810        );
3811    }
3812
3813    #[test]
3814    fn test_physical_name_fallback_id() {
3815        let field_json = r#"{
3816            "name": "logical_name",
3817            "type": "string",
3818            "nullable": true,
3819            "metadata": {}
3820        }"#;
3821        let field: StructField = serde_json::from_str(field_json).unwrap();
3822
3823        // With ColumnMappingMode::Id but no physical name, should fallback to logical name
3824        assert_eq!(field.physical_name(ColumnMappingMode::Id), "logical_name");
3825    }
3826
3827    #[test]
3828    fn test_physical_name_fallback_name() {
3829        let field_json = r#"{
3830            "name": "logical_name",
3831            "type": "string",
3832            "nullable": true,
3833            "metadata": {}
3834        }"#;
3835        let field: StructField = serde_json::from_str(field_json).unwrap();
3836
3837        // With ColumnMappingMode::Name but no physical name, should fallback to logical name
3838        assert_eq!(field.physical_name(ColumnMappingMode::Name), "logical_name");
3839    }
3840
3841    #[test]
3842    fn test_display_struct_type_stable_output() -> DeltaResult<()> {
3843        let nested_field_with_metadata =
3844            StructField::create_metadata_column("nested_row_index", MetadataColumnSpec::RowIndex);
3845        let inner_struct =
3846            StructType::new_unchecked([StructField::new("q", DataType::LONG, false)]);
3847        let nested_struct = StructType::new_unchecked([
3848            nested_field_with_metadata,
3849            StructField::new("x", DataType::DOUBLE, true),
3850            StructField::new(
3851                "inner_struct",
3852                DataType::Struct(Box::new(inner_struct)),
3853                false,
3854            ),
3855        ]);
3856        let array_type = ArrayType::new(DataType::Struct(Box::new(nested_struct.clone())), true);
3857        let map_type = MapType::new(
3858            DataType::Struct(Box::new(nested_struct.clone())),
3859            DataType::Struct(Box::new(nested_struct.clone())), // kek
3860            true,
3861        );
3862        let fields = vec![
3863            StructField::new("x", DataType::DOUBLE, true),
3864            StructField::new("y", DataType::FLOAT, false),
3865            StructField::new("z", DataType::LONG, true),
3866            StructField::new("s", nested_struct.clone(), false),
3867            StructField::nullable("array_col", DataType::Array(Box::new(array_type))),
3868            StructField::nullable("map_col", DataType::Map(Box::new(map_type))),
3869            StructField::new("a", DataType::LONG, true),
3870        ];
3871
3872        let struct_type = StructType::new_unchecked(fields);
3873        assert_eq!(
3874            struct_type.to_string(),
3875            "struct:
3876├─x: double (is nullable: true, metadata: {})
3877├─y: float (is nullable: false, metadata: {})
3878├─z: long (is nullable: true, metadata: {})
3879├─s: struct<nested_row_index: long, x: double, inner_struct: struct<q: long>> (is nullable: false, metadata: {})
3880│  ├─nested_row_index: long (is nullable: false, metadata: {delta.metadataSpec: String(\"row_index\")})
3881│  ├─x: double (is nullable: true, metadata: {})
3882│  └─inner_struct: struct<q: long> (is nullable: false, metadata: {})
3883│     └─q: long (is nullable: false, metadata: {})
3884├─array_col: array<struct<nested_row_index: long, x: double, inner_struct: struct<q: long>>> (is nullable: true, metadata: {})
3885│  └─array_element: struct<nested_row_index: long, x: double, inner_struct: struct<q: long>>
3886│     ├─nested_row_index: long (is nullable: false, metadata: {delta.metadataSpec: String(\"row_index\")})
3887│     ├─x: double (is nullable: true, metadata: {})
3888│     └─inner_struct: struct<q: long> (is nullable: false, metadata: {})
3889│        └─q: long (is nullable: false, metadata: {})
3890├─map_col: map<struct<nested_row_index: long, x: double, inner_struct: struct<q: long>>, struct<nested_row_index: long, x: double, inner_struct: struct<q: long>>> (is nullable: true, metadata: {})
3891│  ├─map_key: struct<nested_row_index: long, x: double, inner_struct: struct<q: long>>
3892│  │  ├─nested_row_index: long (is nullable: false, metadata: {delta.metadataSpec: String(\"row_index\")})
3893│  │  ├─x: double (is nullable: true, metadata: {})
3894│  │  └─inner_struct: struct<q: long> (is nullable: false, metadata: {})
3895│  │     └─q: long (is nullable: false, metadata: {})
3896│  └─map_value: struct<nested_row_index: long, x: double, inner_struct: struct<q: long>>
3897│     ├─nested_row_index: long (is nullable: false, metadata: {delta.metadataSpec: String(\"row_index\")})
3898│     ├─x: double (is nullable: true, metadata: {})
3899│     └─inner_struct: struct<q: long> (is nullable: false, metadata: {})
3900│        └─q: long (is nullable: false, metadata: {})
3901└─a: long (is nullable: true, metadata: {})
3902"
3903        );
3904
3905        let schema = StructType::try_new([StructField::nullable("regular_col", DataType::STRING)])?;
3906        let schema = schema
3907            .add_metadata_column("row_index", MetadataColumnSpec::RowIndex)?
3908            .add_metadata_column("row_id", MetadataColumnSpec::RowId)?
3909            .add_metadata_column("row_commit_version", MetadataColumnSpec::RowCommitVersion)?;
3910        assert_eq!(schema.to_string(), "struct:
3911├─regular_col: string (is nullable: true, metadata: {})
3912├─row_index: long (is nullable: false, metadata: {delta.metadataSpec: String(\"row_index\")})
3913├─row_id: long (is nullable: false, metadata: {delta.metadataSpec: String(\"row_id\")})
3914└─row_commit_version: long (is nullable: false, metadata: {delta.metadataSpec: String(\"row_commit_version\")})
3915");
3916        Ok(())
3917    }
3918
3919    #[test]
3920    fn test_builder_empty() {
3921        let schema = StructType::builder().build().unwrap();
3922        assert_eq!(schema.num_fields(), 0)
3923    }
3924
3925    #[test]
3926    fn test_builder_add_fields() {
3927        let schema = StructType::builder()
3928            .add_field(StructField::new("id", DataType::INTEGER, false))
3929            .add_field(StructField::new("name", DataType::STRING, true))
3930            .build()
3931            .unwrap();
3932
3933        assert_eq!(schema.num_fields(), 2);
3934        assert_eq!(schema.field_at_index(0).unwrap().name(), "id");
3935        assert_eq!(schema.field_at_index(1).unwrap().name(), "name");
3936    }
3937
3938    #[test]
3939    fn test_builder_from_schema() {
3940        let base_schema =
3941            StructType::try_new([StructField::new("id", DataType::INTEGER, false)]).unwrap();
3942
3943        let extended_schema = StructTypeBuilder::from_schema(&base_schema)
3944            .add_field(StructField::new("name", DataType::STRING, true))
3945            .build()
3946            .unwrap();
3947
3948        assert_eq!(extended_schema.num_fields(), 2);
3949        assert_eq!(extended_schema.field_at_index(0).unwrap().name(), "id");
3950        assert_eq!(extended_schema.field_at_index(1).unwrap().name(), "name");
3951    }
3952
3953    #[test]
3954    fn test_parquet_field_id_key_value() {
3955        // Verify the string value of ColumnMetadataKey::ParquetFieldId matches the convention
3956        // used by delta-spark and other Delta ecosystem implementations. This is not part of
3957        // the Delta protocol spec, so we pin the value here to catch accidental changes.
3958        assert_eq!(
3959            ColumnMetadataKey::ParquetFieldId.as_ref(),
3960            "parquet.field.id"
3961        );
3962    }
3963
3964    #[test]
3965    fn test_with_field_inserted_empty_struct() {
3966        let schema = StructType::try_new([]).unwrap();
3967        let schema = schema
3968            .with_field_inserted_after(None, StructField::new("age", DataType::STRING, true))
3969            .expect("with field inserted should produce a valid schema");
3970        assert_eq!(schema.num_fields(), 1);
3971        assert_eq!(schema.field_at_index(0).unwrap().name(), "age");
3972    }
3973
3974    #[test]
3975    fn test_with_field_inserted() {
3976        let schema = StructType::try_new([
3977            StructField::new("id", DataType::INTEGER, false),
3978            StructField::new("name", DataType::STRING, true),
3979        ])
3980        .unwrap();
3981        let schema = schema
3982            .with_field_inserted_after(Some("id"), StructField::new("age", DataType::STRING, true))
3983            .expect("with field inserted should produce a valid schema");
3984        assert_eq!(schema.num_fields(), 3);
3985        assert_eq!(schema.field_at_index(0).unwrap().name(), "id");
3986        assert_eq!(schema.field_at_index(1).unwrap().name(), "age");
3987        assert_eq!(schema.field_at_index(2).unwrap().name(), "name");
3988    }
3989
3990    #[test]
3991    fn test_with_field_inserted_append_to_end() {
3992        let schema = StructType::try_new([
3993            StructField::new("id", DataType::INTEGER, false),
3994            StructField::new("name", DataType::STRING, true),
3995        ])
3996        .unwrap();
3997        let schema = schema
3998            .with_field_inserted_after(None, StructField::new("age", DataType::STRING, true))
3999            .expect("with field inserted should produce a valid schema");
4000
4001        assert_eq!(schema.num_fields(), 3);
4002        assert_eq!(schema.field_at_index(0).unwrap().name(), "id");
4003        assert_eq!(schema.field_at_index(1).unwrap().name(), "name");
4004        assert_eq!(schema.field_at_index(2).unwrap().name(), "age");
4005    }
4006
4007    #[test]
4008    fn test_with_field_inserted_after_non_existent_field() {
4009        let schema =
4010            StructType::try_new([StructField::new("id", DataType::INTEGER, false)]).unwrap();
4011        let new_schema = schema.with_field_inserted_after(
4012            Some("nonexistent"),
4013            StructField::new("name", DataType::STRING, true),
4014        );
4015        assert!(new_schema.is_err());
4016    }
4017
4018    #[test]
4019    fn test_with_field_inserted_after_duplicate_field() {
4020        let schema = StructType::try_new([
4021            StructField::new("id", DataType::INTEGER, false),
4022            StructField::new("name", DataType::STRING, true),
4023        ])
4024        .unwrap();
4025        let new_schema = schema.with_field_inserted_after(
4026            Some("name"),
4027            StructField::new("id", DataType::STRING, true),
4028        );
4029        assert!(new_schema.is_err());
4030        assert_result_error_with_message(new_schema, "Field id already exists");
4031    }
4032
4033    #[test]
4034    fn test_with_field_inserted_before() {
4035        let schema = StructType::try_new([
4036            StructField::new("id", DataType::INTEGER, false),
4037            StructField::new("name", DataType::STRING, true),
4038        ])
4039        .unwrap();
4040        let schema = schema
4041            .with_field_inserted_before(
4042                Some("name"),
4043                StructField::new("age", DataType::STRING, true),
4044            )
4045            .expect("with field inserted before should produce a valid schema");
4046        assert_eq!(schema.num_fields(), 3);
4047        assert_eq!(schema.field_at_index(0).unwrap().name(), "id");
4048        assert_eq!(schema.field_at_index(1).unwrap().name(), "age");
4049        assert_eq!(schema.field_at_index(2).unwrap().name(), "name");
4050    }
4051
4052    #[test]
4053    fn test_with_field_inserted_before_duplicate_field() {
4054        let schema = StructType::try_new([
4055            StructField::new("id", DataType::INTEGER, false),
4056            StructField::new("name", DataType::STRING, true),
4057        ])
4058        .unwrap();
4059        let new_schema = schema.with_field_inserted_before(
4060            Some("name"),
4061            StructField::new("id", DataType::STRING, true),
4062        );
4063        assert!(new_schema.is_err());
4064        assert_result_error_with_message(new_schema, "Field id already exists");
4065    }
4066
4067    #[test]
4068    fn test_with_field_inserted_before_at_beginning() {
4069        let schema = StructType::try_new([
4070            StructField::new("id", DataType::INTEGER, false),
4071            StructField::new("name", DataType::STRING, true),
4072        ])
4073        .unwrap();
4074        let schema = schema
4075            .with_field_inserted_before(None, StructField::new("age", DataType::STRING, true))
4076            .expect("with field inserted before should produce a valid schema");
4077        assert_eq!(schema.num_fields(), 3);
4078        assert_eq!(schema.field_at_index(0).unwrap().name(), "age");
4079        assert_eq!(schema.field_at_index(1).unwrap().name(), "id");
4080        assert_eq!(schema.field_at_index(2).unwrap().name(), "name");
4081    }
4082
4083    #[test]
4084    fn test_with_field_inserted_before_non_existent_field() {
4085        let schema =
4086            StructType::try_new([StructField::new("id", DataType::INTEGER, false)]).unwrap();
4087        let new_schema = schema.with_field_inserted_before(
4088            Some("nonexistent"),
4089            StructField::new("name", DataType::STRING, true),
4090        );
4091        assert!(new_schema.is_err());
4092    }
4093
4094    #[test]
4095    fn test_with_field_inserted_before_empty_struct() {
4096        let schema = StructType::try_new([]).unwrap();
4097        let schema = schema
4098            .with_field_inserted_before(None, StructField::new("age", DataType::STRING, true))
4099            .expect("with field inserted before on empty struct should succeed");
4100        assert_eq!(schema.num_fields(), 1);
4101        assert_eq!(schema.field_at_index(0).unwrap().name(), "age");
4102    }
4103
4104    #[test]
4105    fn test_with_field_removed() {
4106        let schema =
4107            StructType::try_new([StructField::new("id", DataType::INTEGER, false)]).unwrap();
4108        let new_schema = schema.with_field_removed("id");
4109        assert_eq!(new_schema.num_fields(), 0);
4110    }
4111
4112    #[test]
4113    fn test_with_field_removed_non_existent_field() {
4114        let schema =
4115            StructType::try_new([StructField::new("id", DataType::INTEGER, false)]).unwrap();
4116        let new_schema = schema.with_field_removed("nonexistent");
4117        assert_eq!(new_schema.num_fields(), 1);
4118        assert_eq!(new_schema.field_at_index(0).unwrap().name(), "id");
4119    }
4120
4121    #[test]
4122    fn test_with_field_replaced() {
4123        let schema =
4124            StructType::try_new([StructField::new("id", DataType::INTEGER, false)]).unwrap();
4125        let new_schema = schema
4126            .with_field_replaced("id", StructField::new("name", DataType::STRING, true))
4127            .unwrap();
4128
4129        assert_eq!(new_schema.num_fields(), 1);
4130        assert_eq!(new_schema.field_at_index(0).unwrap().name(), "name");
4131    }
4132
4133    #[test]
4134    fn test_with_field_replaced_non_existent_field() {
4135        let schema =
4136            StructType::try_new([StructField::new("id", DataType::INTEGER, false)]).unwrap();
4137        let new_schema = schema.with_field_replaced(
4138            "nonexistent",
4139            StructField::new("name", DataType::STRING, true),
4140        );
4141        assert!(new_schema.is_err(), "Expected error for non-existent field");
4142    }
4143
4144    /// Schema: { a: { b: { c: double } } } — supports walks at depths 1, 2, and 3.
4145    fn walk_test_schema() -> StructType {
4146        let l3 = StructType::new_unchecked([StructField::new("c", DataType::DOUBLE, false)]);
4147        let l2 = StructType::new_unchecked([StructField::new(
4148            "b",
4149            DataType::Struct(Box::new(l3)),
4150            false,
4151        )]);
4152        StructType::new_unchecked([StructField::new("a", DataType::Struct(Box::new(l2)), false)])
4153    }
4154
4155    #[rstest::rstest]
4156    #[case::single_level(vec!["a"], vec!["a"], DataType::Struct(Box::new(
4157        StructType::new_unchecked([StructField::new("b", DataType::Struct(Box::new(
4158            StructType::new_unchecked([StructField::new("c", DataType::DOUBLE, false)])
4159        )), false)])
4160    )))]
4161    #[case::nested_2(vec!["a", "b"], vec!["a", "b"], DataType::Struct(Box::new(
4162        StructType::new_unchecked([StructField::new("c", DataType::DOUBLE, false)])
4163    )))]
4164    #[case::nested_3(vec!["a", "b", "c"], vec!["a", "b", "c"], DataType::DOUBLE)]
4165    #[test]
4166    fn test_walk_column_fields_happy(
4167        #[case] col_path: Vec<&str>,
4168        #[case] expected_names: Vec<&str>,
4169        #[case] expected_leaf_type: DataType,
4170    ) {
4171        let schema = walk_test_schema();
4172        let fields = schema
4173            .walk_column_fields(&ColumnName::new(col_path.iter().copied()))
4174            .unwrap();
4175        assert_eq!(fields.len(), expected_names.len());
4176        for (field, name) in fields.iter().zip(expected_names.iter()) {
4177            assert_eq!(field.name(), *name);
4178        }
4179        assert_eq!(fields.last().unwrap().data_type(), &expected_leaf_type);
4180    }
4181
4182    #[rstest::rstest]
4183    #[case::empty_path(vec![], "Column path cannot be empty")]
4184    #[case::not_found_top(vec!["x"], "not found in schema")]
4185    #[case::not_found_nested(vec!["a", "x"], "not found in schema")]
4186    #[case::intermediate_not_struct(vec!["a", "b", "c", "d"], "not a struct type")]
4187    #[test]
4188    fn test_walk_column_fields_error(#[case] col_path: Vec<&str>, #[case] expected_error: &str) {
4189        let schema = walk_test_schema();
4190        let result = schema.walk_column_fields(&ColumnName::new(col_path.iter().copied()));
4191        assert_result_error_with_message(result, expected_error);
4192    }
4193
4194    #[test]
4195    fn test_normalize_column_names_to_schema_casing() {
4196        let inner =
4197            StructType::new_unchecked(vec![StructField::new("City", DataType::STRING, false)]);
4198        let schema = StructType::new_unchecked(vec![
4199            StructField::new("id", DataType::INTEGER, false),
4200            StructField::new("EventDate", DataType::DATE, false),
4201            StructField::new("Address", DataType::Struct(Box::new(inner)), false),
4202        ]);
4203
4204        // Mismatched casing -> normalized to schema
4205        let cols = vec![ColumnName::new(["eventdate"])];
4206        assert_eq!(
4207            normalize_column_names_to_schema_casing(&schema, &cols)[0].path(),
4208            ["EventDate"]
4209        );
4210
4211        // Nested path -> each field name normalized
4212        let cols = vec![ColumnName::new(["address", "city"])];
4213        assert_eq!(
4214            normalize_column_names_to_schema_casing(&schema, &cols)[0].path(),
4215            ["Address", "City"]
4216        );
4217
4218        // Already matching -> unchanged
4219        let cols = vec![ColumnName::new(["id"])];
4220        assert_eq!(
4221            normalize_column_names_to_schema_casing(&schema, &cols)[0].path(),
4222            ["id"]
4223        );
4224
4225        // Unrecognized -> keeps original
4226        let cols = vec![ColumnName::new(["nonexistent"])];
4227        assert_eq!(
4228            normalize_column_names_to_schema_casing(&schema, &cols)[0].path(),
4229            ["nonexistent"]
4230        );
4231    }
4232}