1use std::borrow::Cow;
4use std::collections::{HashMap, HashSet};
5use std::fmt::{Debug, Display, Formatter};
6use std::iter::{DoubleEndedIterator, FusedIterator};
7use std::str::FromStr;
8use std::sync::{Arc, LazyLock};
9
10use delta_kernel_derive::internal_api;
11use indexmap::IndexMap;
12use itertools::Itertools;
13use serde::{Deserialize, Serialize};
14use tracing::warn;
15
16pub(crate) use crate::expressions::{column_name, ColumnName};
18use crate::reserved_field_ids::FILE_NAME;
19use crate::table_features::{
20 validate_and_extract_column_mapping_annotations, validate_column_mapping_id, ColumnMappingMode,
21};
22use crate::transforms::{transform_output_type, SchemaTransform};
23use crate::utils::require;
24use crate::{DeltaResult, Error};
25
26pub(crate) mod compare;
27#[cfg(feature = "schema-diff")]
28pub(crate) mod diff;
29
30#[cfg(feature = "internal-api")]
31pub mod derive_macro_utils;
32#[cfg(not(feature = "internal-api"))]
33pub(crate) mod derive_macro_utils;
34pub(crate) mod validation;
35pub(crate) mod variant_utils;
36
37pub type Schema = StructType;
38pub type SchemaRef = Arc<StructType>;
39
40#[internal_api]
43pub(crate) trait ToSchema {
44 fn to_schema() -> StructType;
45}
46
47#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Eq)]
48#[serde(untagged)]
49pub enum MetadataValue {
50 Number(i64),
51 String(String),
52 Boolean(bool),
53 Other(serde_json::Value),
58}
59
60impl Display for MetadataValue {
61 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
62 match self {
63 MetadataValue::Number(n) => write!(f, "{n}"),
64 MetadataValue::String(s) => write!(f, "{s}"),
65 MetadataValue::Boolean(b) => write!(f, "{b}"),
66 MetadataValue::Other(v) => write!(f, "{v}"), }
68 }
69}
70
71impl From<String> for MetadataValue {
72 fn from(value: String) -> Self {
73 Self::String(value)
74 }
75}
76
77impl From<&String> for MetadataValue {
78 fn from(value: &String) -> Self {
79 Self::String(value.clone())
80 }
81}
82
83impl From<&str> for MetadataValue {
84 fn from(value: &str) -> Self {
85 Self::String(value.to_string())
86 }
87}
88
89impl From<i64> for MetadataValue {
90 fn from(value: i64) -> Self {
91 Self::Number(value)
92 }
93}
94
95impl From<bool> for MetadataValue {
96 fn from(value: bool) -> Self {
97 Self::Boolean(value)
98 }
99}
100
101#[derive(Debug)]
102pub enum ColumnMetadataKey {
103 ColumnMappingId,
104 ColumnMappingPhysicalName,
105 ColumnMappingNestedIds,
124 ParquetFieldId,
125 ParquetFieldNestedIds,
126 GenerationExpression,
127 IdentityStart,
128 IdentityStep,
129 IdentityHighWaterMark,
130 IdentityAllowExplicitInsert,
131 InternalColumn,
132 Invariants,
133 MetadataSpec,
134}
135
136impl AsRef<str> for ColumnMetadataKey {
137 fn as_ref(&self) -> &str {
138 match self {
139 Self::ColumnMappingId => "delta.columnMapping.id",
140 Self::ColumnMappingPhysicalName => "delta.columnMapping.physicalName",
141 Self::ColumnMappingNestedIds => "delta.columnMapping.nested.ids",
142 Self::ParquetFieldId => "parquet.field.id",
146 Self::ParquetFieldNestedIds => "parquet.field.nested.ids",
151 Self::GenerationExpression => "delta.generationExpression",
152 Self::IdentityAllowExplicitInsert => "delta.identity.allowExplicitInsert",
153 Self::IdentityHighWaterMark => "delta.identity.highWaterMark",
154 Self::IdentityStart => "delta.identity.start",
155 Self::IdentityStep => "delta.identity.step",
156 Self::InternalColumn => "delta.isInternalColumn",
157 Self::Invariants => "delta.invariants",
158 Self::MetadataSpec => "delta.metadataSpec",
159 }
160 }
161}
162
163#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
167pub enum MetadataColumnSpec {
168 RowIndex,
169 RowId,
170 RowCommitVersion,
171 FilePath,
172}
173
174impl MetadataColumnSpec {
175 pub fn text_value(&self) -> &'static str {
177 match self {
178 Self::RowIndex => "row_index",
179 Self::RowId => "row_id",
180 Self::RowCommitVersion => "row_commit_version",
181 Self::FilePath => "_file",
182 }
183 }
184
185 pub fn data_type(&self) -> DataType {
187 match self {
188 Self::RowIndex => DataType::LONG,
189 Self::RowId => DataType::LONG,
190 Self::RowCommitVersion => DataType::LONG,
191 Self::FilePath => DataType::STRING,
192 }
193 }
194
195 pub fn nullable(&self) -> bool {
197 match self {
198 Self::RowIndex => false,
199 Self::RowId => false,
200 Self::RowCommitVersion => false,
201 Self::FilePath => false,
202 }
203 }
204
205 pub fn reserved_field_id(&self) -> Option<i64> {
207 match self {
208 Self::FilePath => Some(FILE_NAME),
209 _ => None,
210 }
211 }
212}
213
214impl FromStr for MetadataColumnSpec {
215 type Err = Error;
216
217 fn from_str(s: &str) -> Result<Self, Self::Err> {
218 match s {
219 "row_index" => Ok(Self::RowIndex),
220 "row_id" => Ok(Self::RowId),
221 "row_commit_version" => Ok(Self::RowCommitVersion),
222 "_file" => Ok(Self::FilePath),
223 _ => Err(Error::Schema(format!("Unknown metadata column spec: {s}"))),
224 }
225 }
226}
227
228#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Eq)]
229pub struct StructField {
230 pub name: String,
232 #[serde(rename = "type")]
234 pub data_type: DataType,
235 pub nullable: bool,
237 pub metadata: HashMap<String, MetadataValue>,
239}
240
241#[derive(Debug, Clone, Copy)]
246pub(crate) struct ExistingColumnMappingAnnotations<'a> {
247 pub id: Option<i64>,
249 pub physical_name: Option<&'a str>,
251}
252
253impl StructField {
254 const DEFAULT_ROW_INDEX_COLUMN_NAME: &'static str = "_metadata.row_index";
259
260 pub fn new(name: impl Into<String>, data_type: impl Into<DataType>, nullable: bool) -> Self {
266 Self {
267 name: name.into(),
268 data_type: data_type.into(),
269 nullable,
270 metadata: HashMap::default(),
271 }
272 }
273
274 pub fn nullable(name: impl Into<String>, data_type: impl Into<DataType>) -> Self {
276 Self::new(name, data_type, true)
277 }
278
279 pub fn not_null(name: impl Into<String>, data_type: impl Into<DataType>) -> Self {
281 Self::new(name, data_type, false)
282 }
283
284 pub fn create_metadata_column(name: impl Into<String>, spec: MetadataColumnSpec) -> Self {
286 let mut metadata = HashMap::new();
287 metadata.insert(
288 ColumnMetadataKey::MetadataSpec.as_ref().to_string(),
289 MetadataValue::String(spec.text_value().to_string()),
290 );
291
292 Self {
293 name: name.into(),
294 data_type: spec.data_type(),
295 nullable: spec.nullable(),
296 metadata,
297 }
298 }
299
300 pub fn default_row_index_column() -> &'static StructField {
302 static DEFAULT_ROW_INDEX_COLUMN: LazyLock<StructField> = LazyLock::new(|| {
303 StructField::create_metadata_column(
304 StructField::DEFAULT_ROW_INDEX_COLUMN_NAME,
305 MetadataColumnSpec::RowIndex,
306 )
307 });
308 &DEFAULT_ROW_INDEX_COLUMN
309 }
310
311 pub fn with_metadata(
317 mut self,
318 metadata: impl IntoIterator<Item = (impl Into<String>, impl Into<MetadataValue>)>,
319 ) -> Self {
320 self.metadata = metadata
321 .into_iter()
322 .map(|(k, v)| (k.into(), v.into()))
323 .collect();
324 self
325 }
326
327 pub fn add_metadata(
329 mut self,
330 metadata: impl IntoIterator<Item = (impl Into<String>, impl Into<MetadataValue>)>,
331 ) -> Self {
332 self.metadata
333 .extend(metadata.into_iter().map(|(k, v)| (k.into(), v.into())));
334 self
335 }
336
337 pub fn is_metadata_column(&self) -> bool {
339 self.metadata
340 .contains_key(ColumnMetadataKey::MetadataSpec.as_ref())
341 }
342
343 pub fn get_metadata_column_spec(&self) -> Option<MetadataColumnSpec> {
345 match self
346 .metadata
347 .get(ColumnMetadataKey::MetadataSpec.as_ref())?
348 {
349 MetadataValue::String(s) => MetadataColumnSpec::from_str(s).ok(),
350 _ => None,
351 }
352 }
353
354 pub fn is_internal_column(&self) -> bool {
358 matches!(
359 self.metadata
360 .get(ColumnMetadataKey::InternalColumn.as_ref()),
361 Some(MetadataValue::Boolean(true))
362 )
363 }
364
365 pub fn as_internal_column(self) -> Self {
367 self.add_metadata(vec![(
368 ColumnMetadataKey::InternalColumn.as_ref().to_string(),
369 MetadataValue::Boolean(true),
370 )])
371 }
372
373 pub fn get_config_value(&self, key: &ColumnMetadataKey) -> Option<&MetadataValue> {
374 self.metadata.get(key.as_ref())
375 }
376
377 pub fn column_mapping_id(&self) -> Option<i64> {
380 match self.get_config_value(&ColumnMetadataKey::ColumnMappingId)? {
381 MetadataValue::Number(n) => Some(*n),
382 _ => None,
383 }
384 }
385
386 pub(crate) fn validate_and_extract_existing_column_mapping_annotations(
407 &self,
408 ) -> DeltaResult<ExistingColumnMappingAnnotations<'_>> {
409 let id = match self.get_config_value(&ColumnMetadataKey::ColumnMappingId) {
410 Some(MetadataValue::Number(n)) => {
411 validate_column_mapping_id(*n)
412 .map_err(|e| Error::schema(format!("Field '{}': {e}", self.name)))?;
413 Some(*n)
414 }
415 None => None,
416 Some(_) => {
417 return Err(Error::schema(format!(
418 "Field '{}' has a non-numeric `{}` annotation",
419 self.name,
420 ColumnMetadataKey::ColumnMappingId.as_ref(),
421 )));
422 }
423 };
424 let physical_name =
425 match self.get_config_value(&ColumnMetadataKey::ColumnMappingPhysicalName) {
426 Some(MetadataValue::String(s)) if s.is_empty() => {
427 return Err(Error::schema(format!(
428 "Field '{}' has an empty `{}` annotation",
429 self.name,
430 ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(),
431 )));
432 }
433 Some(MetadataValue::String(s)) => Some(s.as_str()),
434 None => None,
435 Some(_) => {
436 return Err(Error::schema(format!(
437 "Field '{}' has a non-string `{}` annotation",
438 self.name,
439 ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(),
440 )));
441 }
442 };
443 Ok(ExistingColumnMappingAnnotations { id, physical_name })
444 }
445
446 #[cfg(any(test, feature = "test-utils"))]
450 pub fn collect_column_mapping_ids(&self) -> Vec<i64> {
451 struct CollectIds(Vec<i64>);
452 impl<'a> SchemaTransform<'a> for CollectIds {
453 transform_output_type!(|'a, T| ());
454
455 fn transform_struct_field(&mut self, field: &'a StructField) {
456 if let Some(id) = field.column_mapping_id() {
457 self.0.push(id);
458 }
459 self.recurse_into_struct_field(field)
460 }
461 }
462 let mut visitor = CollectIds(Vec::new());
463 visitor.transform_struct_field(self);
464 visitor.0
465 }
466
467 #[internal_api]
477 pub(crate) fn physical_name(&self, column_mapping_mode: ColumnMappingMode) -> &str {
478 match column_mapping_mode {
479 ColumnMappingMode::None => &self.name,
480 ColumnMappingMode::Id | ColumnMappingMode::Name => {
481 match self
482 .metadata
483 .get(ColumnMetadataKey::ColumnMappingPhysicalName.as_ref())
484 {
485 Some(MetadataValue::String(physical_name)) => physical_name,
486 _ => &self.name,
487 }
488 }
489 }
490 }
491
492 pub(crate) fn has_physical_name_annotation(&self) -> bool {
495 matches!(
496 self.metadata
497 .get(ColumnMetadataKey::ColumnMappingPhysicalName.as_ref()),
498 Some(MetadataValue::String(_))
499 )
500 }
501
502 pub(crate) fn has_id_annotation(&self) -> bool {
505 matches!(
506 self.metadata
507 .get(ColumnMetadataKey::ColumnMappingId.as_ref()),
508 Some(MetadataValue::Number(_))
509 )
510 }
511
512 pub fn with_name(&self, new_name: impl Into<String>) -> Self {
515 StructField {
516 name: new_name.into(),
517 data_type: self.data_type().clone(),
518 nullable: self.nullable,
519 metadata: self.metadata.clone(),
520 }
521 }
522
523 #[inline]
524 pub fn name(&self) -> &String {
525 &self.name
526 }
527
528 #[inline]
529 pub fn is_nullable(&self) -> bool {
530 self.nullable
531 }
532
533 #[inline]
534 pub const fn data_type(&self) -> &DataType {
535 &self.data_type
536 }
537
538 #[inline]
539 pub const fn metadata(&self) -> &HashMap<String, MetadataValue> {
540 &self.metadata
541 }
542
543 pub fn metadata_with_string_values(&self) -> HashMap<String, String> {
546 self.metadata
547 .iter()
548 .map(|(key, val)| (key.clone(), val.to_string()))
549 .collect()
550 }
551
552 #[internal_api]
572 pub(crate) fn make_physical(
573 &self,
574 column_mapping_mode: ColumnMappingMode,
575 ) -> DeltaResult<Self> {
576 MakePhysical::new(column_mapping_mode)
577 .transform_struct_field(self)
578 .map(|f| f.into_owned())
579 }
580
581 pub(crate) fn has_invariants(&self) -> bool {
582 self.metadata
583 .contains_key(ColumnMetadataKey::Invariants.as_ref())
584 }
585
586 fn logical_to_physical_metadata(
595 &self,
596 column_mapping_mode: ColumnMappingMode,
597 ) -> HashMap<String, MetadataValue> {
598 let mut base_metadata = self.metadata.clone();
599 let physical_name_key = ColumnMetadataKey::ColumnMappingPhysicalName.as_ref();
600 let field_id_key = ColumnMetadataKey::ColumnMappingId.as_ref();
601 let parquet_field_id_key = ColumnMetadataKey::ParquetFieldId.as_ref();
602 let field_id = base_metadata.get(ColumnMetadataKey::ColumnMappingId.as_ref());
603 match column_mapping_mode {
604 ColumnMappingMode::Id => {
605 let Some(MetadataValue::Number(fid)) = field_id else {
606 warn!("StructField with name {} is missing field id in the Id column mapping mode", self.name());
609 debug_assert!(false);
610 return base_metadata;
611 };
612 base_metadata.insert(
614 parquet_field_id_key.to_string(),
615 MetadataValue::Number(*fid),
616 );
617 debug_assert!(base_metadata.contains_key(physical_name_key));
619 }
620 ColumnMappingMode::Name => {
621 debug_assert!(base_metadata.contains_key(physical_name_key));
623 debug_assert!(base_metadata.contains_key(field_id_key));
624
625 let Some(MetadataValue::Number(fid)) = field_id else {
629 warn!("StructField with name {} is missing field id in the Name column mapping mode", self.name());
630 debug_assert!(false);
631 return base_metadata;
632 };
633 base_metadata.insert(
634 parquet_field_id_key.to_string(),
635 MetadataValue::Number(*fid),
636 );
637 }
639 ColumnMappingMode::None => {
640 base_metadata.remove(physical_name_key);
641 base_metadata.remove(field_id_key);
642 base_metadata.remove(parquet_field_id_key);
643 }
645 }
646 base_metadata
647 }
648}
649
650impl Display for StructField {
651 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
652 let mut metadata_str = String::from("{");
653 let mut first = true;
654 for (k, v) in self.metadata.iter() {
655 if !first {
656 metadata_str.push_str(", ");
657 }
658 first = false;
659 metadata_str.push_str(&format!("{k}: {v:?}"));
660 }
661 metadata_str.push('}');
662 write!(
663 f,
664 "{}: {} (is nullable: {}, metadata: {})",
665 self.name, self.data_type, self.nullable, metadata_str,
666 )
667 }
668}
669
670#[derive(Debug, PartialEq, Clone, Eq)]
673pub struct StructType {
674 type_name: String,
675 fields: IndexMap<String, StructField>,
680 metadata_columns: HashMap<MetadataColumnSpec, usize>,
684}
685
686pub struct StructTypeBuilder {
687 fields: IndexMap<String, StructField>,
688}
689
690impl Default for StructTypeBuilder {
691 fn default() -> Self {
692 Self::new()
693 }
694}
695
696impl StructTypeBuilder {
697 pub fn new() -> Self {
698 Self {
699 fields: IndexMap::new(),
700 }
701 }
702
703 pub fn from_schema(schema: &StructType) -> Self {
704 Self {
705 fields: schema.fields.clone(),
706 }
707 }
708
709 pub fn add_field(mut self, field: StructField) -> Self {
710 self.fields.insert(field.name.clone(), field);
711 self
712 }
713
714 pub fn build(self) -> DeltaResult<StructType> {
715 StructType::try_new(self.fields.into_values())
716 }
717
718 pub fn build_arc_unchecked(self) -> Arc<StructType> {
719 Arc::new(StructType::new_unchecked(self.fields.into_values()))
720 }
721}
722
723impl StructType {
724 pub fn try_new(fields: impl IntoIterator<Item = StructField>) -> DeltaResult<Self> {
732 let mut field_map = IndexMap::new();
733 let mut metadata_columns = HashMap::new();
734 let mut seen_lowercase_names = HashSet::new();
735
736 for (i, field) in fields.into_iter().enumerate() {
738 if !matches!(field.data_type, DataType::Primitive(_)) {
740 Self::ensure_no_metadata_columns_in_field(&field)?;
741 }
742
743 if let Some(metadata_column_spec) = field.get_metadata_column_spec() {
745 if metadata_columns.insert(metadata_column_spec, i).is_some() {
746 return Err(Error::schema(format!(
747 "Duplicate metadata column: {metadata_column_spec:?}",
748 )));
749 }
750 }
751
752 let key = field.name.to_lowercase();
755 if !seen_lowercase_names.insert(key) {
756 return Err(Error::schema(format!(
757 "Duplicate field name (case-insensitive): '{}'",
758 field.name
759 )));
760 }
761
762 field_map.insert(field.name.clone(), field);
763 }
764
765 Ok(Self {
766 type_name: "struct".into(),
767 fields: field_map,
768 metadata_columns,
769 })
770 }
771
772 pub fn try_from_results<E: Into<Error>>(
777 fields: impl IntoIterator<Item = Result<StructField, E>>,
778 ) -> DeltaResult<Self> {
779 fields
780 .into_iter()
781 .map(|result| result.map_err(Into::into))
782 .process_results(|iter| Self::try_new(iter))?
783 }
784
785 pub fn builder() -> StructTypeBuilder {
786 StructTypeBuilder::new()
787 }
788
789 #[internal_api]
794 pub(crate) fn new_unchecked(fields: impl IntoIterator<Item = StructField>) -> Self {
795 let mut field_map = IndexMap::new();
796 let mut metadata_columns = HashMap::new();
797
798 for (i, field) in fields.into_iter().enumerate() {
799 if let Some(metadata_column_spec) = field.get_metadata_column_spec() {
800 metadata_columns.insert(metadata_column_spec, i);
801 }
802 field_map.insert(field.name.clone(), field);
803 }
804
805 Self {
806 type_name: "struct".into(),
807 fields: field_map,
808 metadata_columns,
809 }
810 }
811
812 pub fn project_as_struct(&self, names: &[impl AsRef<str>]) -> DeltaResult<StructType> {
816 let fields = names.iter().map(|name| {
817 self.fields
818 .get(name.as_ref())
819 .cloned()
820 .ok_or_else(|| Error::missing_column(name.as_ref()))
821 });
822 Self::try_from_results(fields)
823 }
824
825 pub fn project(&self, names: &[impl AsRef<str>]) -> DeltaResult<SchemaRef> {
829 let struct_type = self.project_as_struct(names)?;
830 Ok(Arc::new(struct_type))
831 }
832
833 pub fn add(&self, fields: impl IntoIterator<Item = StructField>) -> DeltaResult<Self> {
835 Self::try_new(self.fields.values().cloned().chain(fields))
836 }
837
838 pub fn add_metadata_column(
840 &self,
841 name: impl Into<String>,
842 spec: MetadataColumnSpec,
843 ) -> DeltaResult<Self> {
844 self.add([StructField::create_metadata_column(name, spec)])
845 }
846
847 pub fn index_of(&self, name: impl AsRef<str>) -> Option<usize> {
849 self.fields.get_index_of(name.as_ref())
850 }
851
852 pub fn index_of_metadata_column(&self, spec: &MetadataColumnSpec) -> Option<&usize> {
854 self.metadata_columns.get(spec)
855 }
856
857 pub fn contains(&self, name: impl AsRef<str>) -> bool {
859 self.fields.contains_key(name.as_ref())
860 }
861
862 pub fn contains_metadata_column(&self, spec: &MetadataColumnSpec) -> bool {
864 self.metadata_columns.contains_key(spec)
865 }
866
867 pub fn field(&self, name: impl AsRef<str>) -> Option<&StructField> {
869 self.fields.get(name.as_ref())
870 }
871
872 #[internal_api]
881 pub(crate) fn walk_column_fields<'a>(
882 &'a self,
883 col: &ColumnName,
884 ) -> DeltaResult<Vec<&'a StructField>> {
885 self.walk_column_fields_by(col, |s, name| s.field(name))
886 }
887
888 pub(crate) fn walk_column_fields_by<'a, F>(
894 &'a self,
895 col: &ColumnName,
896 find_field: F,
897 ) -> DeltaResult<Vec<&'a StructField>>
898 where
899 F: for<'b> Fn(&'b StructType, &str) -> Option<&'b StructField>,
900 {
901 let path = col.path();
902 if path.is_empty() {
903 return Err(Error::generic("Column path cannot be empty"));
904 }
905 let mut current_struct = self;
906 let mut fields = Vec::with_capacity(path.len());
907 for (i, field_name) in path.iter().enumerate() {
908 let field = find_field(current_struct, field_name).ok_or_else(|| {
909 Error::generic(format!(
910 "Could not resolve column '{col}': field '{field_name}' not found in schema"
911 ))
912 })?;
913 fields.push(field);
914 if i < path.len() - 1 {
915 let DataType::Struct(inner) = field.data_type() else {
916 return Err(Error::generic(format!(
917 "Cannot resolve column '{col}': intermediate field '{field_name}' \
918 is not a struct type"
919 )));
920 };
921 current_struct = inner;
922 }
923 }
924 Ok(fields)
925 }
926
927 pub fn field_with_index(&self, name: impl AsRef<str>) -> Option<(usize, &StructField)> {
929 self.fields
930 .get_full(name.as_ref())
931 .map(|(index, _, field)| (index, field))
932 }
933
934 pub fn field_at_index(&self, index: usize) -> Option<&StructField> {
936 self.fields.get_index(index).map(|(_, field)| field)
937 }
938
939 pub fn fields(
941 &self,
942 ) -> impl ExactSizeIterator<Item = &StructField> + DoubleEndedIterator + FusedIterator {
943 self.fields.values()
944 }
945
946 pub fn into_fields(
948 self,
949 ) -> impl ExactSizeIterator<Item = StructField> + DoubleEndedIterator + FusedIterator {
950 self.fields.into_values()
951 }
952
953 pub(crate) fn field_map_mut(&mut self) -> &mut IndexMap<String, StructField> {
955 &mut self.fields
956 }
957
958 #[cfg(any(test, feature = "test-utils"))]
981 #[allow(clippy::panic, clippy::expect_used)]
982 pub fn field_at_path<'a>(&'a self, path: &[String]) -> &'a StructField {
983 fn find_ci<'a>(
984 mut fields: impl Iterator<Item = &'a StructField>,
985 name: &str,
986 ) -> &'a StructField {
987 let lowered = name.to_lowercase();
988 fields
989 .find(|f| f.name().to_lowercase() == lowered)
990 .unwrap_or_else(|| panic!("field '{name}' not found"))
991 }
992 let (first, rest) = path.split_first().expect("non-empty path");
993 let mut field = find_ci(self.fields(), first);
994 for seg in rest {
995 let DataType::Struct(s) = field.data_type() else {
996 panic!("expected struct at intermediate segment '{seg}'");
997 };
998 field = find_ci(s.fields(), seg);
999 }
1000 field
1001 }
1002
1003 pub fn field_names(&self) -> impl ExactSizeIterator<Item = &String> {
1005 self.fields.keys()
1006 }
1007
1008 pub fn num_fields(&self) -> usize {
1010 self.fields.len()
1012 }
1013
1014 #[allow(unused)] #[internal_api]
1022 pub(crate) fn total_struct_fields(&self) -> usize {
1023 fn count_data_type(dt: &DataType) -> usize {
1024 match dt {
1025 DataType::Struct(inner) => inner.total_struct_fields(),
1026 DataType::Array(array) => count_data_type(array.element_type()),
1027 DataType::Map(map) => {
1028 count_data_type(map.key_type()) + count_data_type(map.value_type())
1029 }
1030 _ => 0,
1031 }
1032 }
1033 self.fields()
1034 .map(|field| 1 + count_data_type(field.data_type()))
1035 .sum()
1036 }
1037
1038 pub fn metadata_column(&self, spec: &MetadataColumnSpec) -> Option<&StructField> {
1040 self.metadata_columns
1041 .get(spec)
1042 .and_then(|index| self.fields.get_index(*index).map(|(_, field)| field))
1043 }
1044
1045 pub fn metadata_columns(&self) -> impl Iterator<Item = &StructField> {
1047 self.metadata_columns
1048 .values()
1049 .filter_map(|index| self.fields.get_index(*index).map(|(_, field)| field))
1050 }
1051
1052 #[allow(unused)]
1059 #[internal_api]
1060 pub(crate) fn leaves<'s>(&self, own_name: impl Into<Option<&'s str>>) -> ColumnNamesAndTypes {
1061 let mut get_leaves = GetSchemaLeaves::new(own_name.into());
1062 get_leaves.transform_struct(self);
1063 (get_leaves.names, get_leaves.types).into()
1064 }
1065
1066 #[internal_api]
1073 pub(crate) fn make_physical(
1074 &self,
1075 column_mapping_mode: ColumnMappingMode,
1076 ) -> DeltaResult<Self> {
1077 let mut transformer = MakePhysical::new(column_mapping_mode);
1078 transformer.transform_struct(self).map(|s| s.into_owned())
1079 }
1080
1081 pub(crate) fn ensure_no_metadata_columns(
1083 fields: &mut dyn Iterator<Item = &StructField>,
1084 ) -> DeltaResult<()> {
1085 for field in fields {
1086 Self::ensure_no_metadata_columns_in_field(field)?;
1087 }
1088 Ok(())
1089 }
1090
1091 pub(crate) fn ensure_no_metadata_columns_in_field(field: &StructField) -> DeltaResult<()> {
1093 if field.is_metadata_column() {
1094 return Err(Error::schema(
1095 "Metadata columns are only allowed at the top level of a schema".to_string(),
1096 ));
1097 }
1098
1099 match &field.data_type {
1100 DataType::Struct(struct_type) => {
1101 Self::ensure_no_metadata_columns(&mut struct_type.fields().filter(|f| {
1103 !matches!(f.data_type, DataType::Struct(_) | DataType::Variant(_))
1104 }))?;
1105 }
1106 DataType::Array(array_type) => {
1107 if let DataType::Struct(struct_type) = array_type.element_type() {
1108 Self::ensure_no_metadata_columns(&mut struct_type.fields())?;
1109 }
1110 }
1111 DataType::Map(map_type) => {
1112 if let DataType::Struct(struct_type) = map_type.key_type() {
1113 Self::ensure_no_metadata_columns(&mut struct_type.fields())?;
1114 }
1115 if let DataType::Struct(struct_type) = map_type.value_type() {
1116 Self::ensure_no_metadata_columns(&mut struct_type.fields())?;
1117 }
1118 }
1119 DataType::Primitive(_) | DataType::Variant(_) => {}
1122 };
1123
1124 Ok(())
1125 }
1126
1127 pub fn with_field_inserted_after(
1132 mut self,
1133 after: Option<&str>,
1134 new_field: StructField,
1135 ) -> DeltaResult<Self> {
1136 if self.fields.contains_key(&new_field.name) {
1141 return Err(Error::generic(format!(
1142 "Field {} already exists",
1143 new_field.name
1144 )));
1145 }
1146
1147 let insert_index = after
1148 .map(|after| {
1149 self.fields
1150 .get_index_of(after)
1151 .map(|index| index + 1)
1152 .ok_or_else(|| Error::generic(format!("Field {after} not found")))
1153 })
1154 .unwrap_or_else(|| Ok(self.fields.len()))?;
1155
1156 self.fields
1157 .insert_before(insert_index, new_field.name.clone(), new_field);
1158 Ok(self)
1159 }
1160
1161 pub fn with_field_inserted_before(
1166 mut self,
1167 before: Option<&str>,
1168 new_field: StructField,
1169 ) -> DeltaResult<Self> {
1170 if self.fields.contains_key(&new_field.name) {
1174 return Err(Error::generic(format!(
1175 "Field {} already exists",
1176 new_field.name
1177 )));
1178 }
1179
1180 let index_of_before = before
1181 .map(|before| {
1182 self.fields
1183 .get_index_of(before)
1184 .ok_or_else(|| Error::generic(format!("Field {before} not found")))
1185 })
1186 .unwrap_or_else(|| Ok(0))?;
1187
1188 self.fields
1189 .insert_before(index_of_before, new_field.name.clone(), new_field);
1190 Ok(self)
1191 }
1192
1193 pub fn with_field_removed(mut self, name: &str) -> Self {
1196 self.fields.shift_remove(name);
1197 self
1198 }
1199
1200 pub fn with_fields_filtered(
1203 &self,
1204 predicate: impl Fn(&StructField) -> bool,
1205 ) -> DeltaResult<Self> {
1206 Self::try_new(self.fields().filter(|f| predicate(f)).cloned())
1207 }
1208
1209 pub fn with_fields_filtered_nonempty(
1215 &self,
1216 predicate: impl Fn(&StructField) -> bool,
1217 ) -> DeltaResult<Option<Self>> {
1218 let filtered = self.with_fields_filtered(predicate)?;
1219 if filtered.num_fields() == 0 {
1220 Ok(None)
1221 } else {
1222 Ok(Some(filtered))
1223 }
1224 }
1225
1226 pub fn with_field_replaced(
1229 mut self,
1230 name: &str,
1231 new_field: StructField,
1232 ) -> DeltaResult<StructType> {
1233 let replace_field = self
1234 .fields
1235 .get_mut(name)
1236 .ok_or_else(|| Error::generic(format!("Field {name} not found")))?;
1237
1238 *replace_field = new_field;
1239 Ok(self)
1240 }
1241}
1242
1243fn write_indent(f: &mut Formatter<'_>, levels: &[bool]) -> std::fmt::Result {
1244 let mut it = levels.iter().peekable();
1245
1246 while let Some(is_last) = it.next() {
1247 if it.peek().is_none() {
1249 write!(f, "{}", if *is_last { "└─" } else { "├─" })?;
1250 }
1251 else {
1253 write!(f, "{}", if *is_last { " " } else { "│ " })?;
1254 }
1255 }
1256
1257 Ok(())
1258}
1259
1260fn write_struct_type(
1261 st: &StructType,
1262 f: &mut Formatter<'_>,
1263 levels: &mut Vec<bool>,
1264) -> std::fmt::Result {
1265 let len = st.fields.len();
1266
1267 for (i, (_, field)) in st.fields.iter().enumerate() {
1268 let is_last = i + 1 == len;
1269 levels.push(is_last);
1270
1271 write_indent(f, levels)?;
1272 writeln!(f, "{field}")?;
1273
1274 field.data_type.fmt_recursive(f, levels)?;
1275
1276 levels.pop();
1277 }
1278 Ok(())
1279}
1280
1281impl Display for StructType {
1282 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1283 writeln!(f, "{}:", self.type_name)?;
1284 let mut levels = Vec::new();
1285 write_struct_type(self, f, &mut levels)
1286 }
1287}
1288
1289impl IntoIterator for StructType {
1290 type Item = StructField;
1291 type IntoIter = StructFieldIntoIter;
1292
1293 fn into_iter(self) -> Self::IntoIter {
1294 StructFieldIntoIter {
1295 inner: self.fields.into_values(),
1296 }
1297 }
1298}
1299
1300impl<'a> IntoIterator for &'a StructType {
1301 type Item = &'a StructField;
1302 type IntoIter = StructFieldRefIter<'a>;
1303
1304 fn into_iter(self) -> Self::IntoIter {
1305 StructFieldRefIter {
1306 inner: self.fields.values(),
1307 }
1308 }
1309}
1310
1311#[derive(Debug)]
1339pub struct StructFieldIntoIter {
1340 inner: indexmap::map::IntoValues<String, StructField>,
1341}
1342
1343impl Iterator for StructFieldIntoIter {
1344 type Item = StructField;
1345
1346 fn next(&mut self) -> Option<Self::Item> {
1347 self.inner.next()
1348 }
1349
1350 fn size_hint(&self) -> (usize, Option<usize>) {
1351 self.inner.size_hint()
1352 }
1353
1354 fn count(self) -> usize {
1355 self.inner.count()
1356 }
1357
1358 fn last(self) -> Option<Self::Item> {
1359 self.inner.last()
1360 }
1361
1362 fn nth(&mut self, n: usize) -> Option<Self::Item> {
1363 self.inner.nth(n)
1364 }
1365}
1366
1367impl ExactSizeIterator for StructFieldIntoIter {
1368 fn len(&self) -> usize {
1369 self.inner.len()
1370 }
1371}
1372
1373impl FusedIterator for StructFieldIntoIter {}
1374
1375impl DoubleEndedIterator for StructFieldIntoIter {
1376 fn next_back(&mut self) -> Option<Self::Item> {
1377 self.inner.next_back()
1378 }
1379}
1380
1381#[derive(Debug, Clone)]
1422pub struct StructFieldRefIter<'a> {
1423 inner: indexmap::map::Values<'a, String, StructField>,
1424}
1425
1426impl<'a> Iterator for StructFieldRefIter<'a> {
1427 type Item = &'a StructField;
1428
1429 fn next(&mut self) -> Option<Self::Item> {
1430 self.inner.next()
1431 }
1432
1433 fn size_hint(&self) -> (usize, Option<usize>) {
1434 self.inner.size_hint()
1435 }
1436}
1437
1438impl ExactSizeIterator for StructFieldRefIter<'_> {
1439 fn len(&self) -> usize {
1440 self.inner.len()
1441 }
1442}
1443
1444impl FusedIterator for StructFieldRefIter<'_> {}
1445
1446impl DoubleEndedIterator for StructFieldRefIter<'_> {
1447 fn next_back(&mut self) -> Option<Self::Item> {
1448 self.inner.next_back()
1449 }
1450}
1451
1452struct InvariantChecker;
1453
1454impl<'a> SchemaTransform<'a> for InvariantChecker {
1455 transform_output_type!(|'a, T| Result<(), ()>);
1456
1457 fn transform_struct_field(&mut self, field: &'a StructField) -> Result<(), ()> {
1458 if field.has_invariants() {
1459 Err(())
1460 } else {
1461 self.recurse_into_struct_field(field)
1462 }
1463 }
1464}
1465
1466pub(crate) fn schema_has_invariants(schema: &Schema) -> bool {
1471 InvariantChecker.transform_struct(schema).is_err()
1472}
1473
1474struct NonNullFieldChecker;
1477
1478impl<'a> SchemaTransform<'a> for NonNullFieldChecker {
1479 transform_output_type!(|'a, T| Result<(), ()>);
1480
1481 fn transform_variant(&mut self, _stype: &'a StructType) -> Result<(), ()> {
1485 Ok(())
1486 }
1487
1488 fn transform_struct_field(&mut self, field: &'a StructField) -> Result<(), ()> {
1489 if !field.is_nullable() {
1490 return Err(());
1491 }
1492
1493 self.recurse_into_struct_field(field)
1494 }
1495}
1496
1497pub(crate) fn schema_contains_non_null_fields(schema: &Schema) -> bool {
1502 NonNullFieldChecker.transform_struct(schema).is_err()
1503}
1504
1505pub(crate) fn normalize_column_names_to_schema_casing(
1517 schema: &StructType,
1518 columns: &[ColumnName],
1519) -> Vec<ColumnName> {
1520 columns
1521 .iter()
1522 .map(|col| {
1523 let path = col.path();
1524 let mut normalized: Vec<String> = Vec::with_capacity(path.len());
1525 let mut current_schema = schema;
1526 for (i, field_name) in path.iter().enumerate() {
1527 match current_schema
1528 .fields()
1529 .find(|f| f.name().eq_ignore_ascii_case(field_name))
1530 {
1531 Some(f) => {
1532 normalized.push(f.name().to_string());
1533 if let DataType::Struct(inner) = f.data_type() {
1534 current_schema = inner;
1535 }
1536 }
1537 None => {
1538 normalized.extend(path[i..].iter().cloned());
1541 break;
1542 }
1543 }
1544 }
1545 ColumnName::new(normalized.iter().map(|s| s.as_str()))
1546 })
1547 .collect()
1548}
1549
1550#[internal_api]
1552#[derive(Clone, Default)]
1553pub(crate) struct ColumnNamesAndTypes(Vec<ColumnName>, Vec<DataType>);
1554impl ColumnNamesAndTypes {
1555 #[internal_api]
1556 pub(crate) fn as_ref(&self) -> (&[ColumnName], &[DataType]) {
1557 (&self.0, &self.1)
1558 }
1559}
1560
1561impl From<(Vec<ColumnName>, Vec<DataType>)> for ColumnNamesAndTypes {
1562 fn from((names, fields): (Vec<ColumnName>, Vec<DataType>)) -> Self {
1563 ColumnNamesAndTypes(names, fields)
1564 }
1565}
1566
1567#[derive(Debug, Deserialize, Serialize)]
1568#[serde(rename_all = "camelCase")]
1569struct StructTypeSerDeHelper {
1570 #[serde(rename = "type")]
1571 type_name: String,
1572 fields: Vec<StructField>,
1573}
1574
1575impl Serialize for StructType {
1576 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1577 where
1578 S: serde::Serializer,
1579 {
1580 StructTypeSerDeHelper {
1581 type_name: self.type_name.clone(),
1582 fields: self.fields.values().cloned().collect(),
1583 }
1584 .serialize(serializer)
1585 }
1586}
1587
1588impl<'de> Deserialize<'de> for StructType {
1589 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1590 where
1591 D: serde::Deserializer<'de>,
1592 Self: Sized,
1593 {
1594 let helper = StructTypeSerDeHelper::deserialize(deserializer)?;
1595 StructType::try_new(helper.fields).map_err(serde::de::Error::custom)
1596 }
1597}
1598
1599#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Eq)]
1600#[serde(rename_all = "camelCase")]
1601pub struct ArrayType {
1602 #[serde(rename = "type")]
1603 pub type_name: String,
1604 pub element_type: DataType,
1606 pub contains_null: bool,
1608}
1609
1610impl ArrayType {
1611 pub fn new(element_type: DataType, contains_null: bool) -> Self {
1612 Self {
1613 type_name: "array".into(),
1614 element_type,
1615 contains_null,
1616 }
1617 }
1618
1619 #[inline]
1620 pub const fn element_type(&self) -> &DataType {
1621 &self.element_type
1622 }
1623
1624 #[inline]
1625 pub const fn contains_null(&self) -> bool {
1626 self.contains_null
1627 }
1628}
1629
1630#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Eq)]
1631#[serde(rename_all = "camelCase")]
1632pub struct MapType {
1633 #[serde(rename = "type")]
1634 pub type_name: String,
1635 pub key_type: DataType,
1637 pub value_type: DataType,
1639 #[serde(default = "default_true")]
1641 pub value_contains_null: bool,
1642}
1643
1644impl MapType {
1645 pub fn new(
1646 key_type: impl Into<DataType>,
1647 value_type: impl Into<DataType>,
1648 value_contains_null: bool,
1649 ) -> Self {
1650 Self {
1651 type_name: "map".into(),
1652 key_type: key_type.into(),
1653 value_type: value_type.into(),
1654 value_contains_null,
1655 }
1656 }
1657
1658 #[inline]
1659 pub const fn key_type(&self) -> &DataType {
1660 &self.key_type
1661 }
1662
1663 #[inline]
1664 pub const fn value_type(&self) -> &DataType {
1665 &self.value_type
1666 }
1667
1668 #[inline]
1669 pub const fn value_contains_null(&self) -> bool {
1670 self.value_contains_null
1671 }
1672
1673 pub fn as_struct_schema(&self, key_name: String, val_name: String) -> Schema {
1676 StructType::new_unchecked([
1677 StructField::not_null(key_name, self.key_type.clone()),
1678 StructField::new(val_name, self.value_type.clone(), self.value_contains_null),
1679 ])
1680 }
1681}
1682
1683fn default_true() -> bool {
1684 true
1685}
1686
1687#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
1688pub struct DecimalType {
1689 precision: u8,
1690 scale: u8,
1691}
1692
1693impl DecimalType {
1694 pub fn try_new(precision: u8, scale: u8) -> DeltaResult<Self> {
1696 require!(
1697 0 < precision && precision <= 38,
1698 Error::invalid_decimal(format!(
1699 "precision must be in range 1..38 inclusive, found: {precision}."
1700 ))
1701 );
1702 require!(
1703 scale <= precision,
1704 Error::invalid_decimal(format!(
1705 "scale must be in range 0..{precision} inclusive, found: {scale}."
1706 ))
1707 );
1708 Ok(Self { precision, scale })
1709 }
1710
1711 pub fn precision(&self) -> u8 {
1712 self.precision
1713 }
1714
1715 pub fn scale(&self) -> u8 {
1716 self.scale
1717 }
1718}
1719
1720#[derive(Debug, Serialize, PartialEq, Clone, Eq)]
1721#[serde(rename_all = "camelCase")]
1722pub enum PrimitiveType {
1723 String,
1725 Long,
1727 Integer,
1729 Short,
1731 Byte,
1733 Float,
1735 Double,
1737 Boolean,
1739 Binary,
1740 Date,
1741 Timestamp,
1743 #[serde(rename = "timestamp_ntz")]
1744 TimestampNtz,
1745 #[cfg(feature = "nanosecond-timestamps")]
1746 #[serde(rename = "timestamp_nanos")]
1747 TimestampNanos,
1748 #[serde(serialize_with = "serialize_decimal", untagged)]
1749 Decimal(DecimalType),
1750}
1751
1752impl PrimitiveType {
1753 pub fn decimal(precision: u8, scale: u8) -> DeltaResult<Self> {
1754 Ok(DecimalType::try_new(precision, scale)?.into())
1755 }
1756
1757 #[internal_api]
1766 pub(crate) fn can_widen_to(&self, target: &Self) -> bool {
1767 use PrimitiveType::*;
1768 matches!(
1769 (self, target),
1770 (Byte, Short | Integer | Long)
1772 | (Short, Integer | Long)
1773 | (Integer, Long)
1774 | (Float, Double)
1776 | (Timestamp, TimestampNtz)
1780 | (TimestampNtz, Timestamp)
1781 )
1782 }
1783
1784 pub(crate) fn is_checkpoint_cast_compatible(&self, target: &Self) -> bool {
1797 matches!(
1798 (self, target),
1799 (Self::Integer, Self::Date) | (Self::Long, Self::Timestamp | Self::TimestampNtz)
1800 )
1801 }
1802
1803 pub(crate) fn is_stats_type_compatible_with(&self, target: &Self) -> bool {
1813 self == target || self.can_widen_to(target) || self.is_checkpoint_cast_compatible(target)
1814 }
1815}
1816
1817fn serialize_decimal<S: serde::Serializer>(
1818 dtype: &DecimalType,
1819 serializer: S,
1820) -> Result<S::Ok, S::Error> {
1821 serializer.serialize_str(&format!("decimal({},{})", dtype.precision(), dtype.scale()))
1822}
1823
1824fn serialize_variant<S: serde::Serializer>(
1825 _: &StructType,
1826 serializer: S,
1827) -> Result<S::Ok, S::Error> {
1828 serializer.serialize_str("variant")
1829}
1830
1831impl<'de> serde::Deserialize<'de> for PrimitiveType {
1835 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1836 where
1837 D: serde::Deserializer<'de>,
1838 {
1839 let str_value = String::deserialize(deserializer)?;
1840
1841 match str_value.as_str() {
1842 "string" => Ok(PrimitiveType::String),
1843 "long" => Ok(PrimitiveType::Long),
1844 "integer" => Ok(PrimitiveType::Integer),
1845 "short" => Ok(PrimitiveType::Short),
1846 "byte" => Ok(PrimitiveType::Byte),
1847 "float" => Ok(PrimitiveType::Float),
1848 "double" => Ok(PrimitiveType::Double),
1849 "boolean" => Ok(PrimitiveType::Boolean),
1850 "binary" => Ok(PrimitiveType::Binary),
1851 "date" => Ok(PrimitiveType::Date),
1852 "timestamp" => Ok(PrimitiveType::Timestamp),
1853 "timestamp_ntz" => Ok(PrimitiveType::TimestampNtz),
1854 #[cfg(feature = "nanosecond-timestamps")]
1855 "timestamp_nanos" => Ok(PrimitiveType::TimestampNanos),
1856 decimal_str if decimal_str.starts_with("decimal(") && decimal_str.ends_with(')') => {
1857 let mut parts = decimal_str[8..decimal_str.len() - 1].split(',');
1859 let precision = parts
1860 .next()
1861 .and_then(|part| part.trim().parse::<u8>().ok())
1862 .ok_or_else(|| {
1863 serde::de::Error::custom(format!(
1864 "Invalid precision in decimal: {decimal_str}"
1865 ))
1866 })?;
1867 let scale = parts
1868 .next()
1869 .and_then(|part| part.trim().parse::<u8>().ok())
1870 .ok_or_else(|| {
1871 serde::de::Error::custom(format!("Invalid scale in decimal: {decimal_str}"))
1872 })?;
1873 if parts.next().is_some() {
1875 return Err(serde::de::Error::custom(format!(
1876 "Invalid decimal format (expected 2 parts): {decimal_str}"
1877 )));
1878 }
1879 DecimalType::try_new(precision, scale)
1880 .map(PrimitiveType::Decimal)
1881 .map_err(serde::de::Error::custom)
1882 }
1883 unsupported => Err(serde::de::Error::custom(format!(
1884 "Unsupported Delta table type: '{unsupported}'"
1885 ))),
1886 }
1887 }
1888}
1889
1890impl Display for PrimitiveType {
1891 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1892 match self {
1893 PrimitiveType::String => write!(f, "string"),
1894 PrimitiveType::Long => write!(f, "long"),
1895 PrimitiveType::Integer => write!(f, "integer"),
1896 PrimitiveType::Short => write!(f, "short"),
1897 PrimitiveType::Byte => write!(f, "byte"),
1898 PrimitiveType::Float => write!(f, "float"),
1899 PrimitiveType::Double => write!(f, "double"),
1900 PrimitiveType::Boolean => write!(f, "boolean"),
1901 PrimitiveType::Binary => write!(f, "binary"),
1902 PrimitiveType::Date => write!(f, "date"),
1903 PrimitiveType::Timestamp => write!(f, "timestamp"),
1904 PrimitiveType::TimestampNtz => write!(f, "timestamp_ntz"),
1905 #[cfg(feature = "nanosecond-timestamps")]
1906 PrimitiveType::TimestampNanos => write!(f, "timestamp_nanos"),
1907 PrimitiveType::Decimal(dtype) => {
1908 write!(f, "decimal({},{})", dtype.precision(), dtype.scale())
1909 }
1910 }
1911 }
1912}
1913
1914#[derive(Debug, Serialize, PartialEq, Clone, Eq)]
1915#[serde(untagged, rename_all = "camelCase")]
1916pub enum DataType {
1917 Primitive(PrimitiveType),
1919 Array(Box<ArrayType>),
1921 Struct(Box<StructType>),
1924 Map(Box<MapType>),
1927 #[serde(serialize_with = "serialize_variant")]
1930 Variant(Box<StructType>),
1931}
1932
1933impl From<DecimalType> for PrimitiveType {
1934 fn from(dtype: DecimalType) -> Self {
1935 PrimitiveType::Decimal(dtype)
1936 }
1937}
1938impl From<DecimalType> for DataType {
1939 fn from(dtype: DecimalType) -> Self {
1940 PrimitiveType::from(dtype).into()
1941 }
1942}
1943impl From<PrimitiveType> for DataType {
1944 fn from(ptype: PrimitiveType) -> Self {
1945 DataType::Primitive(ptype)
1946 }
1947}
1948impl From<MapType> for DataType {
1949 fn from(map_type: MapType) -> Self {
1950 DataType::Map(Box::new(map_type))
1951 }
1952}
1953
1954impl From<StructType> for DataType {
1955 fn from(struct_type: StructType) -> Self {
1956 DataType::Struct(Box::new(struct_type))
1957 }
1958}
1959
1960impl From<ArrayType> for DataType {
1961 fn from(array_type: ArrayType) -> Self {
1962 DataType::Array(Box::new(array_type))
1963 }
1964}
1965
1966impl From<SchemaRef> for DataType {
1967 fn from(schema: SchemaRef) -> Self {
1968 Arc::unwrap_or_clone(schema).into()
1969 }
1970}
1971
1972impl<'de> serde::Deserialize<'de> for DataType {
1977 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1978 where
1979 D: serde::Deserializer<'de>,
1980 {
1981 use serde::de::Error;
1982 use serde_json::Value;
1983
1984 let value = Value::deserialize(deserializer)?;
1985
1986 if let Value::String(s) = &value {
1988 if s == "variant" {
1989 return match DataType::unshredded_variant() {
1990 DataType::Variant(st) => Ok(DataType::Variant(st)),
1991 _ => Err(Error::custom("Failed to create variant type")),
1992 };
1993 }
1994
1995 return PrimitiveType::deserialize(value.clone())
1997 .map(DataType::Primitive)
1998 .map_err(|e| Error::custom(e.to_string()));
1999 }
2000
2001 if let Value::Object(map) = &value {
2003 if let Some(Value::String(type_str)) = map.get("type") {
2004 return match type_str.as_str() {
2005 "array" => ArrayType::deserialize(value)
2006 .map(|at| DataType::Array(Box::new(at)))
2007 .map_err(|e| Error::custom(e.to_string())),
2008 "struct" => StructType::deserialize(value)
2009 .map(|st| DataType::Struct(Box::new(st)))
2010 .map_err(|e| Error::custom(e.to_string())),
2011 "map" => MapType::deserialize(value)
2012 .map(|mt| DataType::Map(Box::new(mt)))
2013 .map_err(|e| Error::custom(e.to_string())),
2014 _ => Err(Error::custom(format!("Unknown complex type: '{type_str}'"))),
2015 };
2016 }
2017 }
2018
2019 Err(Error::custom(format!(
2021 "Invalid data type: {}",
2022 serde_json::to_string(&value).unwrap_or_else(|_| format!("{value:?}"))
2023 )))
2024 }
2025}
2026
2027impl DataType {
2029 pub const STRING: Self = DataType::Primitive(PrimitiveType::String);
2030 pub const LONG: Self = DataType::Primitive(PrimitiveType::Long);
2031 pub const INTEGER: Self = DataType::Primitive(PrimitiveType::Integer);
2032 pub const SHORT: Self = DataType::Primitive(PrimitiveType::Short);
2033 pub const BYTE: Self = DataType::Primitive(PrimitiveType::Byte);
2034 pub const FLOAT: Self = DataType::Primitive(PrimitiveType::Float);
2035 pub const DOUBLE: Self = DataType::Primitive(PrimitiveType::Double);
2036 pub const BOOLEAN: Self = DataType::Primitive(PrimitiveType::Boolean);
2037 pub const BINARY: Self = DataType::Primitive(PrimitiveType::Binary);
2038 pub const DATE: Self = DataType::Primitive(PrimitiveType::Date);
2039 pub const TIMESTAMP: Self = DataType::Primitive(PrimitiveType::Timestamp);
2040 pub const TIMESTAMP_NTZ: Self = DataType::Primitive(PrimitiveType::TimestampNtz);
2041 #[cfg(feature = "nanosecond-timestamps")]
2042 pub const TIMESTAMP_NANOS: Self = DataType::Primitive(PrimitiveType::TimestampNanos);
2043 pub fn decimal(precision: u8, scale: u8) -> DeltaResult<Self> {
2045 Ok(PrimitiveType::decimal(precision, scale)?.into())
2046 }
2047
2048 pub fn try_struct_type(fields: impl IntoIterator<Item = StructField>) -> DeltaResult<Self> {
2050 Ok(StructType::try_new(fields)?.into())
2051 }
2052
2053 pub fn try_struct_type_from_results<E: Into<Error>>(
2055 fields: impl IntoIterator<Item = Result<StructField, E>>,
2056 ) -> DeltaResult<Self> {
2057 StructType::try_from_results(fields).map(Self::from)
2058 }
2059
2060 pub(crate) fn struct_type_unchecked(fields: impl IntoIterator<Item = StructField>) -> Self {
2062 StructType::new_unchecked(fields).into()
2063 }
2064
2065 pub fn unshredded_variant() -> Self {
2068 DataType::Variant(Box::new(StructType::new_unchecked([
2069 StructField::not_null("metadata", DataType::BINARY),
2070 StructField::not_null("value", DataType::BINARY),
2071 ])))
2072 }
2073
2074 pub fn variant_type(fields: impl IntoIterator<Item = StructField>) -> DeltaResult<Self> {
2077 Ok(DataType::Variant(Box::new(StructType::try_from_results(
2080 fields.into_iter().map(|field| {
2081 if field.is_metadata_column() {
2082 Err(Error::schema(
2083 "Metadata columns are not allowed in Variant types".to_string(),
2084 ))
2085 } else {
2086 Ok(field)
2087 }
2088 }),
2089 )?)))
2090 }
2091
2092 pub fn as_primitive_opt(&self) -> Option<&PrimitiveType> {
2095 match self {
2096 DataType::Primitive(ptype) => Some(ptype),
2097 _ => None,
2098 }
2099 }
2100
2101 fn fmt_recursive(&self, f: &mut Formatter<'_>, levels: &mut Vec<bool>) -> std::fmt::Result {
2102 match self {
2103 DataType::Struct(inner) => write_struct_type(inner, f, levels),
2104
2105 DataType::Array(inner) => {
2106 levels.push(true); write_indent(f, levels)?;
2108 writeln!(f, "array_element: {}", inner.element_type)?;
2109 inner.element_type.fmt_recursive(f, levels)?;
2110 levels.pop();
2111 Ok(())
2112 }
2113
2114 DataType::Map(inner) => {
2115 levels.push(false); write_indent(f, levels)?;
2118 writeln!(f, "map_key: {}", inner.key_type)?;
2119 inner.key_type.fmt_recursive(f, levels)?;
2120 levels.pop();
2121
2122 levels.push(true); write_indent(f, levels)?;
2125 writeln!(f, "map_value: {}", inner.value_type)?;
2126 inner.value_type.fmt_recursive(f, levels)?;
2127 levels.pop();
2128 Ok(())
2129 }
2130
2131 _ => Ok(()),
2132 }
2133 }
2134}
2135
2136impl Display for DataType {
2137 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2138 match self {
2139 DataType::Primitive(p) => write!(f, "{p}"),
2140 DataType::Array(a) => write!(f, "array<{}>", a.element_type),
2141 DataType::Struct(s) => {
2142 write!(f, "struct<")?;
2143 for (i, field) in s.fields().enumerate() {
2144 if i > 0 {
2145 write!(f, ", ")?;
2146 }
2147 write!(f, "{}: {}", field.name, field.data_type)?;
2148 }
2149 write!(f, ">")
2150 }
2151 DataType::Map(m) => write!(f, "map<{}, {}>", m.key_type, m.value_type),
2152 DataType::Variant(_) => write!(f, "variant"),
2153 }
2154 }
2155}
2156
2157struct GetSchemaLeaves {
2158 path: Vec<String>,
2159 names: Vec<ColumnName>,
2160 types: Vec<DataType>,
2161}
2162impl GetSchemaLeaves {
2163 fn new(own_name: Option<&str>) -> Self {
2164 Self {
2165 path: own_name.into_iter().map(|s| s.to_string()).collect(),
2166 names: vec![],
2167 types: vec![],
2168 }
2169 }
2170}
2171
2172impl<'a> SchemaTransform<'a> for GetSchemaLeaves {
2173 transform_output_type!(|'a, T| ());
2174
2175 fn transform_struct_field(&mut self, field: &'a StructField) {
2176 self.path.push(field.name.clone());
2177 if let DataType::Struct(_) = field.data_type {
2178 self.recurse_into_struct_field(field);
2179 } else {
2180 self.names.push(ColumnName::new(&self.path));
2181 self.types.push(field.data_type.clone());
2182 }
2183 self.path.pop();
2184 }
2185}
2186
2187struct MakePhysical<'a> {
2188 column_mapping_mode: ColumnMappingMode,
2189 logical_path: Vec<&'a str>,
2191 seen_ids: HashMap<i64, &'a str>,
2193 sibling_names_stack: Vec<HashMap<&'a str, &'a str>>,
2199}
2200impl<'a> MakePhysical<'a> {
2201 fn new(column_mapping_mode: ColumnMappingMode) -> Self {
2202 Self {
2203 column_mapping_mode,
2204 logical_path: vec![],
2205 seen_ids: HashMap::new(),
2206 sibling_names_stack: vec![],
2207 }
2208 }
2209
2210 fn transform_inner<T>(
2211 &mut self,
2212 logical_name: &'a str,
2213 transform: impl FnOnce(&mut Self) -> DeltaResult<T>,
2214 ) -> DeltaResult<T> {
2215 self.logical_path.push(logical_name);
2216 let result = transform(self);
2217 self.logical_path.pop();
2218 result
2219 }
2220}
2221impl<'a> SchemaTransform<'a> for MakePhysical<'a> {
2222 transform_output_type!(|'a, T| DeltaResult<Cow<'a, T>>);
2223
2224 fn transform_struct(&mut self, stype: &'a StructType) -> DeltaResult<Cow<'a, StructType>> {
2225 self.sibling_names_stack.push(HashMap::new());
2226 let result = self.recurse_into_struct(stype);
2227 self.sibling_names_stack.pop();
2228 result
2229 }
2230
2231 fn transform_array_element(&mut self, etype: &'a DataType) -> DeltaResult<Cow<'a, DataType>> {
2232 self.transform_inner("<array element>", |this| this.transform(etype))
2233 }
2234 fn transform_map_key(&mut self, ktype: &'a DataType) -> DeltaResult<Cow<'a, DataType>> {
2235 self.transform_inner("<map key>", |this| this.transform(ktype))
2236 }
2237 fn transform_map_value(&mut self, vtype: &'a DataType) -> DeltaResult<Cow<'a, DataType>> {
2238 self.transform_inner("<map value>", |this| this.transform(vtype))
2239 }
2240 fn transform_struct_field(
2241 &mut self,
2242 field: &'a StructField,
2243 ) -> DeltaResult<Cow<'a, StructField>> {
2244 let (physical_name, _id) = validate_and_extract_column_mapping_annotations(
2245 field,
2246 self.column_mapping_mode,
2247 &self.logical_path,
2248 Some(&mut self.seen_ids),
2249 self.sibling_names_stack.last_mut(),
2250 )?;
2251
2252 if field.is_metadata_column() {
2253 return Ok(Cow::Borrowed(field));
2254 }
2255
2256 self.transform_inner(field.name(), |this| {
2257 let field = this.recurse_into_struct_field(field)?;
2258
2259 let metadata = field.logical_to_physical_metadata(this.column_mapping_mode);
2260 let name = physical_name.to_owned();
2261
2262 Ok(Cow::Owned(field.with_name(name).with_metadata(metadata)))
2263 })
2264 }
2265
2266 fn transform_variant(&mut self, stype: &'a StructType) -> DeltaResult<Cow<'a, StructType>> {
2267 Ok(Cow::Borrowed(stype))
2270 }
2271}
2272
2273#[cfg(test)]
2274mod tests {
2275 use rstest::rstest;
2276 use serde_json;
2277
2278 use super::*;
2279 use crate::table_features::ColumnMappingMode;
2280 use crate::utils::test_utils::{
2281 assert_result_error_with_message, column_mapping_physical_name_dedup_fixtures as fixtures,
2282 test_deep_nested_schema_missing_leaf_cm,
2283 };
2284
2285 fn example_schema_metadata() -> &'static str {
2286 r#"
2287 {
2288 "name": "e",
2289 "type": {
2290 "type": "array",
2291 "elementType": {
2292 "type": "struct",
2293 "fields": [
2294 {
2295 "name": "d",
2296 "type": "integer",
2297 "nullable": false,
2298 "metadata": {
2299 "delta.columnMapping.id": 5,
2300 "delta.columnMapping.physicalName": "col-a7f4159c-53be-4cb0-b81a-f7e5240cfc49"
2301 }
2302 }
2303 ]
2304 },
2305 "containsNull": true
2306 },
2307 "nullable": true,
2308 "metadata": {
2309 "delta.columnMapping.id": 4,
2310 "delta.columnMapping.physicalName": "col-5f422f40-de70-45b2-88ab-1d5c90e94db1",
2311 "delta.identity.start": 2147483648
2312 }
2313 }"#
2314 }
2315
2316 #[test]
2317 fn test_serde_data_types() {
2318 let data = r#"
2319 {
2320 "name": "a",
2321 "type": "integer",
2322 "nullable": false,
2323 "metadata": {}
2324 }
2325 "#;
2326 let field: StructField = serde_json::from_str(data).unwrap();
2327 assert!(matches!(field.data_type, DataType::INTEGER));
2328
2329 let data = r#"
2330 {
2331 "name": "c",
2332 "type": {
2333 "type": "array",
2334 "elementType": "integer",
2335 "containsNull": false
2336 },
2337 "nullable": true,
2338 "metadata": {}
2339 }
2340 "#;
2341 let field: StructField = serde_json::from_str(data).unwrap();
2342 assert!(matches!(field.data_type, DataType::Array(_)));
2343
2344 let data = r#"
2345 {
2346 "name": "e",
2347 "type": {
2348 "type": "array",
2349 "elementType": {
2350 "type": "struct",
2351 "fields": [
2352 {
2353 "name": "d",
2354 "type": "integer",
2355 "nullable": false,
2356 "metadata": {}
2357 }
2358 ]
2359 },
2360 "containsNull": true
2361 },
2362 "nullable": true,
2363 "metadata": {}
2364 }
2365 "#;
2366 let field: StructField = serde_json::from_str(data).unwrap();
2367 assert!(matches!(field.data_type, DataType::Array(_)));
2368 match field.data_type {
2369 DataType::Array(array) => assert!(matches!(array.element_type, DataType::Struct(_))),
2370 _ => unreachable!(),
2371 }
2372
2373 let data = r#"
2374 {
2375 "name": "f",
2376 "type": {
2377 "type": "map",
2378 "keyType": "string",
2379 "valueType": "string",
2380 "valueContainsNull": true
2381 },
2382 "nullable": true,
2383 "metadata": {}
2384 }
2385 "#;
2386 let field: StructField = serde_json::from_str(data).unwrap();
2387 assert!(matches!(field.data_type, DataType::Map(_)));
2388 }
2389
2390 #[test]
2391 fn test_roundtrip_decimal() {
2392 let data = r#"
2393 {
2394 "name": "a",
2395 "type": "decimal(10, 2)",
2396 "nullable": false,
2397 "metadata": {}
2398 }
2399 "#;
2400 let field: StructField = serde_json::from_str(data).unwrap();
2401 assert_eq!(field.data_type, DataType::decimal(10, 2).unwrap());
2402
2403 let json_str = serde_json::to_string(&field).unwrap();
2404 assert_eq!(
2405 json_str,
2406 r#"{"name":"a","type":"decimal(10,2)","nullable":false,"metadata":{}}"#
2407 );
2408 }
2409
2410 #[test]
2411 fn test_roundtrip_variant() {
2412 let data = r#"
2413 {
2414 "name": "v",
2415 "type": "variant",
2416 "nullable": false,
2417 "metadata": {}
2418 }
2419 "#;
2420 let field: StructField = serde_json::from_str(data).unwrap();
2421 assert_eq!(field.data_type, DataType::unshredded_variant());
2422
2423 let json_str = serde_json::to_string(&field).unwrap();
2424 assert_eq!(
2425 json_str,
2426 r#"{"name":"v","type":"variant","nullable":false,"metadata":{}}"#
2427 );
2428 }
2429
2430 #[test]
2431 fn test_unshredded_variant() {
2432 let unshredded_variant_type = DataType::unshredded_variant();
2433
2434 match &unshredded_variant_type {
2435 DataType::Variant(struct_type) => {
2436 let fields: Vec<_> = struct_type.fields().collect();
2437 assert_eq!(fields.len(), 2);
2438
2439 assert_eq!(fields[0].name, "metadata");
2440 assert_eq!(fields[0].data_type, DataType::BINARY);
2441 assert!(!fields[0].nullable);
2442
2443 assert_eq!(fields[1].name, "value");
2444 assert_eq!(fields[1].data_type, DataType::BINARY);
2445 assert!(!fields[1].nullable);
2446 }
2447 _ => panic!("Expected DataType::Variant, got {unshredded_variant_type:?}"),
2448 }
2449 }
2450
2451 #[rstest]
2452 #[case("interval second")]
2453 #[case("interval day")]
2454 #[case("money")]
2455 fn test_unsupported_type_error_message(#[case] unsupported_type: &str) {
2456 let data = format!(
2457 r#"{{
2458 "name": "test_field",
2459 "type": "{unsupported_type}",
2460 "nullable": false,
2461 "metadata": {{}}
2462 }}"#
2463 );
2464 let result: Result<StructField, _> = serde_json::from_str(&data);
2465 assert!(result.is_err());
2466 let err = result.unwrap_err();
2467 let expected_msg = format!("Unsupported Delta table type: '{unsupported_type}'");
2468 assert!(
2469 err.to_string().contains(&expected_msg),
2470 "Expected error message about unsupported type '{unsupported_type}', got: {err}"
2471 );
2472 }
2473
2474 #[rstest]
2475 #[case("string", DataType::STRING)]
2476 #[case("long", DataType::LONG)]
2477 #[case("integer", DataType::INTEGER)]
2478 #[case("short", DataType::SHORT)]
2479 #[case("byte", DataType::BYTE)]
2480 #[case("float", DataType::FLOAT)]
2481 #[case("double", DataType::DOUBLE)]
2482 #[case("boolean", DataType::BOOLEAN)]
2483 #[case("binary", DataType::BINARY)]
2484 #[case("date", DataType::DATE)]
2485 #[case("timestamp", DataType::TIMESTAMP)]
2486 #[case("timestamp_ntz", DataType::TIMESTAMP_NTZ)]
2487 fn test_primitive_type_deserialization_still_works(
2488 #[case] type_str: &str,
2489 #[case] expected_type: DataType,
2490 ) {
2491 let data = format!(
2492 r#"{{
2493 "name": "test_field",
2494 "type": "{type_str}",
2495 "nullable": false,
2496 "metadata": {{}}
2497 }}"#
2498 );
2499 let field: StructField = serde_json::from_str(&data).unwrap();
2500 assert_eq!(field.data_type, expected_type);
2501 }
2502
2503 #[rstest]
2504 #[case(10, 2)]
2505 #[case(16, 4)]
2506 #[case(38, 10)]
2507 fn test_decimal_with_primitive_deserializer(#[case] precision: u8, #[case] scale: u8) {
2508 let data = format!(
2509 r#"{{
2510 "name": "test_decimal",
2511 "type": "decimal({precision},{scale})",
2512 "nullable": false,
2513 "metadata": {{}}
2514 }}"#
2515 );
2516 let field: StructField = serde_json::from_str(&data).unwrap();
2517 assert_eq!(
2518 field.data_type,
2519 DataType::decimal(precision, scale).unwrap()
2520 );
2521 }
2522
2523 #[rstest]
2524 #[case("decimal(invalid)", "Invalid precision in decimal")]
2525 #[case("decimal(10)", "Invalid scale in decimal")]
2526 #[case("decimal()", "Invalid precision in decimal")]
2527 #[case("decimal(10,2,99)", "Invalid decimal format (expected 2 parts)")]
2528 fn test_invalid_decimal_format(#[case] invalid_type: &str, #[case] expected_error: &str) {
2529 let data = format!(
2530 r#"{{
2531 "name": "invalid",
2532 "type": "{invalid_type}",
2533 "nullable": false,
2534 "metadata": {{}}
2535 }}"#
2536 );
2537 let result: Result<StructField, _> = serde_json::from_str(&data);
2538 assert!(result.is_err());
2539 let err = result.unwrap_err();
2540 assert!(
2541 err.to_string().contains(expected_error),
2542 "Expected error containing '{expected_error}', got: {err}"
2543 );
2544 }
2545
2546 #[rstest]
2547 #[case(
2548 r#"{"type": "array", "elementType": "integer", "containsNull": false}"#,
2549 DataType::Array(Box::new(ArrayType::new(DataType::INTEGER, false)))
2550 )]
2551 #[case(
2552 r#"{"type": "struct", "fields": [{"name": "a", "type": "integer", "nullable": false, "metadata": {}}, {"name": "b", "type": "string", "nullable": true, "metadata": {}}]}"#,
2553 DataType::Struct(Box::new(StructType::new_unchecked([
2554 StructField::new("a", DataType::INTEGER, false),
2555 StructField::new("b", DataType::STRING, true),
2556 ])))
2557 )]
2558 #[case(
2559 r#"{"type": "map", "keyType": "string", "valueType": "integer", "valueContainsNull": true}"#,
2560 DataType::Map(Box::new(MapType::new(DataType::STRING, DataType::INTEGER, true)))
2561 )]
2562 #[case("\"string\"", DataType::STRING)]
2563 #[case("\"long\"", DataType::LONG)]
2564 #[case("\"integer\"", DataType::INTEGER)]
2565 #[case("\"short\"", DataType::SHORT)]
2566 #[case("\"byte\"", DataType::BYTE)]
2567 #[case("\"float\"", DataType::FLOAT)]
2568 #[case("\"double\"", DataType::DOUBLE)]
2569 #[case("\"boolean\"", DataType::BOOLEAN)]
2570 #[case("\"binary\"", DataType::BINARY)]
2571 #[case("\"date\"", DataType::DATE)]
2572 #[case("\"timestamp\"", DataType::TIMESTAMP)]
2573 #[case("\"timestamp_ntz\"", DataType::TIMESTAMP_NTZ)]
2574 #[case("\"variant\"", DataType::unshredded_variant())]
2575 fn test_data_type_deserialization(#[case] type_json: &str, #[case] expected: DataType) {
2576 let data_type: DataType = serde_json::from_str(type_json).unwrap();
2577 assert_eq!(data_type, expected);
2578 }
2579
2580 #[test]
2581 fn test_make_physical_no_column_mapping() {
2582 let field = StructField::nullable(
2583 "e",
2584 ArrayType::new(
2585 StructType::new_unchecked([StructField::not_null("d", DataType::INTEGER)]).into(),
2586 true,
2587 ),
2588 );
2589 let physical_field = field.make_physical(ColumnMappingMode::None).unwrap();
2590
2591 assert_eq!(physical_field.name, "e");
2592 assert!(physical_field
2593 .get_config_value(&ColumnMetadataKey::ColumnMappingId)
2594 .is_none());
2595 assert!(physical_field
2596 .get_config_value(&ColumnMetadataKey::ColumnMappingPhysicalName)
2597 .is_none());
2598
2599 let DataType::Array(atype) = physical_field.data_type else {
2600 panic!("Expected an Array");
2601 };
2602 let DataType::Struct(stype) = atype.element_type else {
2603 panic!("Expected a Struct");
2604 };
2605 let struct_field = stype.fields.get_index(0).unwrap().1;
2606 assert_eq!(struct_field.name, "d");
2607 }
2608
2609 #[test]
2610 fn test_make_physical_rejects_annotated_fields_when_column_mapping_disabled() {
2611 let data = example_schema_metadata();
2612 let field: StructField = serde_json::from_str(data).unwrap();
2613 assert!(field.make_physical(ColumnMappingMode::None).is_err());
2614 }
2615
2616 #[test]
2617 fn test_make_physical_rejects_unannotated_leaf_in_deep_nesting() {
2618 let schema = test_deep_nested_schema_missing_leaf_cm();
2619 let field = schema.fields().next().unwrap();
2620 let err = field
2621 .make_physical(ColumnMappingMode::Name)
2622 .unwrap_err()
2623 .to_string();
2624 assert!(
2625 err.contains("top.`<array element>`.mid_field.`<map value>`.leaf"),
2626 "Expected full nested path in error, got: {err}"
2627 );
2628 }
2629
2630 #[test]
2631 fn test_make_physical_rejects_duplicate_column_mapping_ids() {
2632 use crate::schema::ColumnMetadataKey;
2633
2634 fn cm_field(name: &str, id: i64, data_type: impl Into<DataType>) -> StructField {
2635 StructField::not_null(name, data_type).with_metadata([
2636 (
2637 ColumnMetadataKey::ColumnMappingId.as_ref(),
2638 MetadataValue::Number(id),
2639 ),
2640 (
2641 ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(),
2642 MetadataValue::String(format!("col-{name}")),
2643 ),
2644 ])
2645 }
2646
2647 let inner = StructType::new_unchecked([
2648 cm_field("x", 3, DataType::INTEGER),
2649 cm_field("y", 4, DataType::STRING),
2650 ]);
2651 let schema = StructType::new_unchecked([
2652 cm_field("a", 1, DataType::INTEGER),
2653 cm_field(
2654 "b",
2655 2,
2656 ArrayType::new(DataType::Struct(Box::new(inner)), true),
2657 ),
2658 cm_field("c", 3, DataType::STRING),
2659 ]);
2660 assert_result_error_with_message(
2661 schema.make_physical(ColumnMappingMode::Id),
2662 "Duplicate column mapping ID",
2663 );
2664 }
2665
2666 #[rstest]
2667 #[case::accepted_same_phy_name_different_paths(fixtures::same_phy_name_different_paths(), None)]
2668 #[case::rejected_deeply_nested_repeat_physical_paths(
2669 fixtures::deeply_nested_repeat_physical_paths(),
2670 Some({
2671 let (a, b) =
2672 fixtures::deeply_nested_collider_paths();
2673 format!("assigned to both '{a}' and '{b}'")
2674 }),
2675 )]
2676 #[case::multiple_physical_name_collisions_reports_first(
2677 fixtures::multiple_physical_name_collisions(),
2678 Some("'p' assigned to both 'a' and 'b'".to_string()),
2679 )]
2680 fn test_make_physical_dup_physical_name(
2681 #[case] schema: StructType,
2682 #[case] expected_error_substring: Option<String>,
2683 ) {
2684 for mode in [ColumnMappingMode::Name, ColumnMappingMode::Id] {
2686 let result = schema.make_physical(mode);
2687 match &expected_error_substring {
2688 None => {
2689 result.expect("The input schema should be valid");
2690 }
2691 Some(substr) => {
2692 assert_result_error_with_message(result.as_ref().map(|_| ()), substr);
2693 if let Err(e) = &result {
2694 assert!(
2695 !e.to_string().contains("'q'"),
2696 "walker must short-circuit on first collision under {mode:?}; got: {e}"
2697 );
2698 }
2699 }
2700 }
2701 }
2702 }
2703
2704 #[test]
2705 fn test_make_physical_column_mapping() {
2706 [ColumnMappingMode::Name, ColumnMappingMode::Id]
2707 .into_iter()
2708 .for_each(|mode| {
2709 let data = example_schema_metadata();
2710
2711 let field: StructField = serde_json::from_str(data).unwrap();
2712
2713 let col_id = field
2714 .get_config_value(&ColumnMetadataKey::ColumnMappingId)
2715 .unwrap();
2716 let id_start = field
2717 .get_config_value(&ColumnMetadataKey::IdentityStart)
2718 .unwrap();
2719 assert!(matches!(col_id, MetadataValue::Number(num) if *num == 4));
2720 assert!(matches!(id_start, MetadataValue::Number(num) if *num == 2147483648i64));
2721 assert_eq!(
2722 field.physical_name(mode),
2723 "col-5f422f40-de70-45b2-88ab-1d5c90e94db1"
2724 );
2725 let physical_field = field.make_physical(mode).unwrap();
2726
2727 match mode {
2729 ColumnMappingMode::Id => {
2730 assert!(matches!(
2731 physical_field.get_config_value(&ColumnMetadataKey::ParquetFieldId),
2732 Some(MetadataValue::Number(4))
2733 ));
2734
2735 assert!(matches!(
2736 physical_field.get_config_value(&ColumnMetadataKey::ColumnMappingId),
2737 Some(MetadataValue::Number(4))
2738 ));
2739 }
2740 ColumnMappingMode::Name => {
2741 assert!(matches!(
2742 physical_field.get_config_value(&ColumnMetadataKey::ParquetFieldId),
2743 Some(MetadataValue::Number(4))
2744 ));
2745 assert!(matches!(
2746 physical_field.get_config_value(&ColumnMetadataKey::ColumnMappingId),
2747 Some(MetadataValue::Number(4))
2748 ));
2749 }
2750 ColumnMappingMode::None => panic!("unexpected column mapping mode"),
2751 }
2752
2753 assert_eq!(
2754 physical_field.name,
2755 "col-5f422f40-de70-45b2-88ab-1d5c90e94db1"
2756 );
2757 let DataType::Array(atype) = physical_field.data_type else {
2758 panic!("Expected an Array");
2759 };
2760 let DataType::Struct(stype) = atype.element_type else {
2761 panic!("Expected a Struct");
2762 };
2763
2764 let struct_field = stype.fields.get_index(0).unwrap().1;
2765 assert_eq!(
2766 struct_field.name,
2767 "col-a7f4159c-53be-4cb0-b81a-f7e5240cfc49"
2768 );
2769
2770 match mode {
2772 ColumnMappingMode::Id => {
2773 assert!(matches!(
2774 struct_field.get_config_value(&ColumnMetadataKey::ParquetFieldId),
2775 Some(MetadataValue::Number(5))
2776 ));
2777 assert!(matches!(
2778 struct_field.get_config_value(&ColumnMetadataKey::ColumnMappingId),
2779 Some(MetadataValue::Number(5))
2780 ));
2781 }
2782 ColumnMappingMode::Name => {
2783 assert!(matches!(
2784 struct_field.get_config_value(&ColumnMetadataKey::ParquetFieldId),
2785 Some(MetadataValue::Number(5))
2786 ));
2787 assert!(matches!(
2788 struct_field.get_config_value(&ColumnMetadataKey::ColumnMappingId),
2789 Some(MetadataValue::Number(5))
2790 ));
2791 }
2792 ColumnMappingMode::None => panic!("unexpected column mapping mode"),
2793 }
2794 });
2795 }
2796
2797 #[test]
2798 fn test_make_physical_passes_metadata_column_through() {
2799 let field = StructField::create_metadata_column(
2800 "_metadata.row_index",
2801 MetadataColumnSpec::RowIndex,
2802 );
2803 for mode in [
2804 ColumnMappingMode::None,
2805 ColumnMappingMode::Name,
2806 ColumnMappingMode::Id,
2807 ] {
2808 let physical = field.make_physical(mode).unwrap();
2809 assert_eq!(physical.name(), "_metadata.row_index");
2810 assert!(physical.is_metadata_column());
2811 }
2812 }
2813
2814 #[test]
2815 fn test_make_physical_rejects_metadata_column_with_cm_annotations() {
2816 let field = StructField::create_metadata_column(
2817 "_metadata.row_index",
2818 MetadataColumnSpec::RowIndex,
2819 )
2820 .add_metadata([(
2821 ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(),
2822 MetadataValue::String("phys".to_string()),
2823 )]);
2824 assert_result_error_with_message(
2825 field.make_physical(ColumnMappingMode::Name),
2826 "must not have column mapping annotations",
2827 );
2828 }
2829
2830 #[test]
2831 fn test_read_schemas() {
2832 let file = std::fs::File::open("./tests/serde/schema.json").unwrap();
2833 let schema: Result<Schema, _> = serde_json::from_reader(file);
2834 assert!(schema.is_ok());
2835
2836 let file = std::fs::File::open("./tests/serde/checkpoint_schema.json").unwrap();
2837 let schema: Result<Schema, _> = serde_json::from_reader(file);
2838 assert!(schema.is_ok())
2839 }
2840
2841 #[test]
2842 fn test_invalid_decimal() {
2843 let data = r#"
2844 {
2845 "name": "a",
2846 "type": "decimal(39, 10)",
2847 "nullable": false,
2848 "metadata": {}
2849 }
2850 "#;
2851 assert!(serde_json::from_str::<StructField>(data).is_err());
2852
2853 let data = r#"
2854 {
2855 "name": "a",
2856 "type": "decimal(10, 39)",
2857 "nullable": false,
2858 "metadata": {}
2859 }
2860 "#;
2861 assert!(serde_json::from_str::<StructField>(data).is_err());
2862 }
2863
2864 #[test]
2865 fn test_metadata_value_to_string() {
2866 assert_eq!(MetadataValue::Number(0).to_string(), "0");
2867 assert_eq!(
2868 MetadataValue::String("hello".to_string()).to_string(),
2869 "hello"
2870 );
2871 assert_eq!(MetadataValue::Boolean(true).to_string(), "true");
2872 assert_eq!(MetadataValue::Boolean(false).to_string(), "false");
2873 let object_json = serde_json::json!({ "an": "object" });
2874 assert_eq!(
2875 MetadataValue::Other(object_json).to_string(),
2876 "{\"an\":\"object\"}"
2877 );
2878 let array_json = serde_json::json!(["an", "array"]);
2879 assert_eq!(
2880 MetadataValue::Other(array_json).to_string(),
2881 "[\"an\",\"array\"]"
2882 );
2883 }
2884
2885 #[test]
2886 fn test_num_fields() {
2887 let schema = StructType::new_unchecked([]);
2888 assert!(schema.num_fields() == 0);
2889 let schema = StructType::new_unchecked([
2890 StructField::nullable("a", DataType::LONG),
2891 StructField::nullable("b", DataType::LONG),
2892 StructField::nullable("c", DataType::LONG),
2893 StructField::nullable("d", DataType::LONG),
2894 ]);
2895 assert_eq!(schema.num_fields(), 4);
2896 let schema = StructType::new_unchecked([
2897 StructField::nullable("b", DataType::LONG),
2898 StructField::not_null("b", DataType::LONG),
2899 StructField::nullable("c", DataType::LONG),
2900 StructField::nullable("c", DataType::LONG),
2901 ]);
2902 assert_eq!(schema.num_fields(), 2);
2903 }
2904
2905 #[test]
2906 fn test_has_invariants() {
2907 let schema = StructType::new_unchecked([
2909 StructField::nullable("a", DataType::STRING),
2910 StructField::nullable("b", DataType::INTEGER),
2911 ]);
2912 assert!(!schema_has_invariants(&schema));
2913
2914 let mut field = StructField::nullable("c", DataType::STRING);
2916 field.metadata.insert(
2917 ColumnMetadataKey::Invariants.as_ref().to_string(),
2918 MetadataValue::String("c > 0".to_string()),
2919 );
2920
2921 let schema =
2922 StructType::new_unchecked([StructField::nullable("a", DataType::STRING), field]);
2923 assert!(schema_has_invariants(&schema));
2924
2925 let nested_field = StructField::nullable(
2927 "nested_c",
2928 DataType::try_struct_type([{
2929 let mut field = StructField::nullable("d", DataType::INTEGER);
2930 field.metadata.insert(
2931 ColumnMetadataKey::Invariants.as_ref().to_string(),
2932 MetadataValue::String("d > 0".to_string()),
2933 );
2934 field
2935 }])
2936 .unwrap(),
2937 );
2938
2939 let schema = StructType::new_unchecked([
2940 StructField::nullable("a", DataType::STRING),
2941 StructField::nullable("b", DataType::INTEGER),
2942 nested_field,
2943 ]);
2944 assert!(schema_has_invariants(&schema));
2945
2946 let array_field = StructField::nullable(
2948 "array_field",
2949 ArrayType::new(
2950 DataType::try_struct_type([{
2951 let mut field = StructField::nullable("d", DataType::INTEGER);
2952 field.metadata.insert(
2953 ColumnMetadataKey::Invariants.as_ref().to_string(),
2954 MetadataValue::String("d > 0".to_string()),
2955 );
2956 field
2957 }])
2958 .unwrap(),
2959 true,
2960 ),
2961 );
2962
2963 let schema = StructType::new_unchecked([
2964 StructField::nullable("a", DataType::STRING),
2965 StructField::nullable("b", DataType::INTEGER),
2966 array_field,
2967 ]);
2968 assert!(schema_has_invariants(&schema));
2969
2970 let map_field = StructField::nullable(
2972 "map_field",
2973 MapType::new(
2974 DataType::STRING,
2975 DataType::try_struct_type([{
2976 let mut field = StructField::nullable("d", DataType::INTEGER);
2977 field.metadata.insert(
2978 ColumnMetadataKey::Invariants.as_ref().to_string(),
2979 MetadataValue::String("d > 0".to_string()),
2980 );
2981 field
2982 }])
2983 .unwrap(),
2984 true,
2985 ),
2986 );
2987
2988 let schema = StructType::new_unchecked([
2989 StructField::nullable("a", DataType::STRING),
2990 StructField::nullable("b", DataType::INTEGER),
2991 map_field,
2992 ]);
2993 assert!(schema_has_invariants(&schema));
2994 }
2995
2996 fn all_nullable_schema() -> StructType {
2997 StructType::new_unchecked([
2998 StructField::nullable("a", DataType::STRING),
2999 StructField::nullable("b", DataType::INTEGER),
3000 ])
3001 }
3002
3003 fn top_level_non_null_schema() -> StructType {
3004 StructType::new_unchecked([
3005 StructField::not_null("id", DataType::INTEGER),
3006 StructField::nullable("name", DataType::STRING),
3007 ])
3008 }
3009
3010 fn nested_non_null_schema() -> StructType {
3011 let nested_field = StructField::nullable(
3012 "parent",
3013 DataType::try_struct_type([StructField::not_null("child", DataType::INTEGER)]).unwrap(),
3014 );
3015 StructType::new_unchecked([StructField::nullable("a", DataType::STRING), nested_field])
3016 }
3017
3018 fn array_non_null_schema() -> StructType {
3019 let array_field = StructField::nullable(
3020 "arr",
3021 ArrayType::new(
3022 DataType::try_struct_type([StructField::not_null("child", DataType::INTEGER)])
3023 .unwrap(),
3024 true,
3025 ),
3026 );
3027 StructType::new_unchecked([array_field])
3028 }
3029
3030 fn map_non_null_schema() -> StructType {
3031 let map_field = StructField::nullable(
3032 "map",
3033 MapType::new(
3034 DataType::STRING,
3035 DataType::try_struct_type([StructField::not_null("child", DataType::INTEGER)])
3036 .unwrap(),
3037 true,
3038 ),
3039 );
3040 StructType::new_unchecked([map_field])
3041 }
3042
3043 fn variant_only_schema() -> StructType {
3044 StructType::new_unchecked([StructField::nullable("v", DataType::unshredded_variant())])
3047 }
3048
3049 #[rstest]
3050 #[case::all_nullable(all_nullable_schema(), false)]
3051 #[case::top_level(top_level_non_null_schema(), true)]
3052 #[case::nested_struct(nested_non_null_schema(), true)]
3053 #[case::array_element(array_non_null_schema(), true)]
3054 #[case::map_value(map_non_null_schema(), true)]
3055 #[case::variant_skipped(variant_only_schema(), false)]
3056 fn test_schema_contains_non_null_fields(#[case] schema: StructType, #[case] expected: bool) {
3057 assert_eq!(schema_contains_non_null_fields(&schema), expected);
3058 }
3059
3060 #[test]
3061 fn test_struct_type_iterator_basic() {
3062 let fields = vec![
3063 StructField::new("field1", DataType::STRING, true),
3064 StructField::new("field2", DataType::INTEGER, false),
3065 StructField::new("field3", DataType::BOOLEAN, true),
3066 ];
3067 let struct_type = StructType::new_unchecked(fields.clone());
3068
3069 let field_names: Vec<_> = struct_type.fields().map(|f| f.name()).collect();
3071 assert_eq!(field_names, vec!["field1", "field2", "field3"]);
3072
3073 assert_eq!(struct_type.field("field1").unwrap().name, "field1");
3075 }
3076
3077 #[test]
3078 fn test_struct_type_into_iterator_owned() {
3079 let fields = vec![
3080 StructField::new("a", DataType::STRING, true),
3081 StructField::new("b", DataType::INTEGER, false),
3082 ];
3083 let struct_type = StructType::new_unchecked(fields);
3084
3085 let mut field_names = Vec::new();
3087 for field in struct_type {
3088 field_names.push(field.name);
3089 }
3090 assert_eq!(field_names, vec!["a", "b"]);
3091 }
3092
3093 #[test]
3094 fn test_struct_type_into_iterator_references() {
3095 let fields = vec![
3096 StructField::new("x", DataType::DOUBLE, true),
3097 StructField::new("y", DataType::FLOAT, false),
3098 StructField::new("z", DataType::LONG, true),
3099 ];
3100 let struct_type = StructType::new_unchecked(fields);
3101
3102 let mut field_names = Vec::new();
3104 for field in &struct_type {
3105 field_names.push(field.name().clone());
3106 }
3107 assert_eq!(field_names, vec!["x", "y", "z"]);
3108
3109 assert_eq!(struct_type.field("x").unwrap().name, "x");
3111 }
3112
3113 #[test]
3114 fn test_iterator_exact_size() {
3115 let fields = vec![
3116 StructField::new("field1", DataType::STRING, true),
3117 StructField::new("field2", DataType::INTEGER, false),
3118 StructField::new("field3", DataType::BOOLEAN, true),
3119 StructField::new("field4", DataType::DATE, true),
3120 ];
3121
3122 let struct_type = StructType::new_unchecked(fields.clone());
3124 let ref_iter = struct_type.fields();
3125 assert_eq!(ref_iter.len(), 4);
3126
3127 let struct_type = StructType::new_unchecked(fields.clone());
3129 let into_ref_iter = (&struct_type).into_iter();
3130 assert_eq!(into_ref_iter.len(), 4);
3131
3132 let struct_type = StructType::new_unchecked(fields);
3134 let into_owned_iter = struct_type.into_iter();
3135 assert_eq!(into_owned_iter.len(), 4);
3136 }
3137
3138 #[test]
3139 fn test_iterator_with_metadata() {
3140 let field_with_metadata = StructField::new("test_field", DataType::STRING, true)
3141 .with_metadata([("key1", MetadataValue::String("value1".to_string()))]);
3142
3143 let struct_type = StructType::new_unchecked([field_with_metadata]);
3144
3145 for field in &struct_type {
3147 assert_eq!(field.metadata().len(), 1);
3148 assert_eq!(
3149 field.metadata().get("key1"),
3150 Some(&MetadataValue::String("value1".to_string()))
3151 );
3152 }
3153
3154 for field in struct_type {
3156 assert_eq!(field.metadata().len(), 1);
3157 assert_eq!(
3158 field.metadata().get("key1"),
3159 Some(&MetadataValue::String("value1".to_string()))
3160 );
3161 }
3162 }
3163
3164 #[test]
3165 fn test_empty_struct_type_iterator() {
3166 let struct_type = StructType::new_unchecked(std::iter::empty::<StructField>());
3167
3168 assert_eq!(struct_type.fields().count(), 0);
3170 assert_eq!((&struct_type).into_iter().count(), 0);
3171 assert_eq!(struct_type.into_iter().count(), 0);
3172 }
3173
3174 #[test]
3175 fn test_iterator_order_preservation() {
3176 let fields = vec![
3177 StructField::new("zebra", DataType::STRING, true),
3178 StructField::new("apple", DataType::INTEGER, false),
3179 StructField::new("banana", DataType::BOOLEAN, true),
3180 ];
3181 let struct_type = StructType::new_unchecked(fields);
3182
3183 let field_names: Vec<_> = struct_type.fields().map(|f| f.name()).collect();
3185 assert_eq!(field_names, vec!["zebra", "apple", "banana"]);
3186
3187 let ref_names: Vec<_> = (&struct_type).into_iter().map(|f| f.name()).collect();
3189 assert_eq!(ref_names, vec!["zebra", "apple", "banana"]);
3190
3191 let owned_names: Vec<_> = struct_type.into_iter().map(|f| f.name).collect();
3193 assert_eq!(owned_names, vec!["zebra", "apple", "banana"]);
3194 }
3195
3196 #[test]
3197 fn test_iterator_collect() {
3198 let original_fields = vec![
3199 StructField::new("field1", DataType::STRING, true),
3200 StructField::new("field2", DataType::INTEGER, false),
3201 ];
3202 let struct_type = StructType::new_unchecked(original_fields.clone());
3203
3204 let collected_refs: Vec<&StructField> = struct_type.fields().collect();
3206 assert_eq!(collected_refs.len(), 2);
3207 assert_eq!(collected_refs[0].name, "field1");
3208 assert_eq!(collected_refs[1].name, "field2");
3209
3210 let collected_owned: Vec<StructField> = struct_type.into_iter().collect();
3212 assert_eq!(collected_owned.len(), 2);
3213 assert_eq!(collected_owned[0].name, "field1");
3214 assert_eq!(collected_owned[1].name, "field2");
3215 }
3216
3217 #[test]
3218 fn test_iterator_functional_methods() {
3219 let fields = vec![
3220 StructField::new("nullable_string", DataType::STRING, true),
3221 StructField::new("required_int", DataType::INTEGER, false),
3222 StructField::new("nullable_bool", DataType::BOOLEAN, true),
3223 StructField::new("required_long", DataType::LONG, false),
3224 ];
3225 let struct_type = StructType::new_unchecked(fields);
3226
3227 let nullable_count = struct_type.fields().filter(|f| f.is_nullable()).count();
3229 assert_eq!(nullable_count, 2);
3230
3231 let required_field_names: Vec<_> = struct_type
3233 .fields()
3234 .filter(|f| !f.is_nullable())
3235 .map(|f| f.name())
3236 .collect();
3237 assert_eq!(required_field_names, vec!["required_int", "required_long"]);
3238
3239 for (index, field) in struct_type.fields().enumerate() {
3241 match index {
3242 0 => assert_eq!(field.name, "nullable_string"),
3243 1 => assert_eq!(field.name, "required_int"),
3244 2 => assert_eq!(field.name, "nullable_bool"),
3245 3 => assert_eq!(field.name, "required_long"),
3246 _ => panic!("Unexpected field index: {index}"),
3247 }
3248 }
3249 }
3250
3251 #[test]
3252 fn test_double_ended_iterator_ref() {
3253 let fields = vec![
3254 StructField::new("first", DataType::STRING, true),
3255 StructField::new("second", DataType::INTEGER, false),
3256 StructField::new("third", DataType::BOOLEAN, true),
3257 StructField::new("fourth", DataType::LONG, false),
3258 ];
3259 let struct_type = StructType::new_unchecked(fields);
3260
3261 let mut iter = struct_type.fields();
3263
3264 assert_eq!(iter.next().unwrap().name, "first");
3266 assert_eq!(iter.next().unwrap().name, "second");
3267
3268 assert_eq!(iter.next_back().unwrap().name, "fourth");
3270 assert_eq!(iter.next_back().unwrap().name, "third");
3271
3272 assert!(iter.next().is_none());
3274 assert!(iter.next_back().is_none());
3275 }
3276
3277 #[test]
3278 fn test_double_ended_iterator_owned() {
3279 let fields = vec![
3280 StructField::new("alpha", DataType::STRING, true),
3281 StructField::new("beta", DataType::INTEGER, false),
3282 StructField::new("gamma", DataType::BOOLEAN, true),
3283 ];
3284 let struct_type = StructType::new_unchecked(fields);
3285
3286 let mut iter = struct_type.into_iter();
3288
3289 assert_eq!(iter.next_back().unwrap().name, "gamma");
3291
3292 assert_eq!(iter.next().unwrap().name, "alpha");
3294
3295 assert_eq!(iter.next_back().unwrap().name, "beta");
3297
3298 assert!(iter.next().is_none());
3300 assert!(iter.next_back().is_none());
3301 }
3302
3303 #[test]
3304 fn test_double_ended_iterator_collect_reverse() {
3305 let fields = vec![
3306 StructField::new("one", DataType::STRING, true),
3307 StructField::new("two", DataType::INTEGER, false),
3308 StructField::new("three", DataType::BOOLEAN, true),
3309 ];
3310 let struct_type = StructType::new_unchecked(fields);
3311
3312 let reversed_names: Vec<_> = struct_type.fields().rev().map(|f| f.name()).collect();
3314 assert_eq!(reversed_names, vec!["three", "two", "one"]);
3315
3316 assert_eq!(struct_type.field("two").unwrap().name, "two");
3318 }
3319
3320 #[test]
3321 fn test_double_ended_iterator_with_into_iter_ref() {
3322 let fields = vec![
3323 StructField::new("x", DataType::DOUBLE, true),
3324 StructField::new("y", DataType::FLOAT, false),
3325 StructField::new("z", DataType::LONG, true),
3326 ];
3327 let struct_type = StructType::new_unchecked(fields);
3328
3329 let mut iter = (&struct_type).into_iter();
3331
3332 assert_eq!(iter.next().unwrap().name, "x");
3334 assert_eq!(iter.next_back().unwrap().name, "z");
3335 assert_eq!(iter.next().unwrap().name, "y");
3336
3337 assert!(iter.next().is_none());
3339 assert!(iter.next_back().is_none());
3340 }
3341
3342 #[test]
3343 fn test_fused_iterator_ref() {
3344 let fields = vec![
3345 StructField::new("test1", DataType::STRING, true),
3346 StructField::new("test2", DataType::INTEGER, false),
3347 ];
3348 let struct_type = StructType::new_unchecked(fields);
3349
3350 let mut iter = struct_type.fields();
3352
3353 assert!(iter.next().is_some());
3355 assert!(iter.next().is_some());
3356 assert!(iter.next().is_none());
3357
3358 assert!(iter.next().is_none());
3360 assert!(iter.next().is_none());
3361 assert!(iter.next().is_none());
3362 }
3363
3364 #[test]
3365 fn test_fused_iterator_owned() {
3366 let fields = vec![
3367 StructField::new("item1", DataType::STRING, true),
3368 StructField::new("item2", DataType::INTEGER, false),
3369 ];
3370 let struct_type = StructType::new_unchecked(fields);
3371
3372 let mut iter = struct_type.into_iter();
3374
3375 assert!(iter.next().is_some());
3377 assert!(iter.next().is_some());
3378 assert!(iter.next().is_none());
3379
3380 assert!(iter.next().is_none());
3382 assert!(iter.next().is_none());
3383 assert!(iter.next().is_none());
3384 }
3385
3386 #[test]
3387 fn test_fused_iterator_with_into_iter_ref() {
3388 let fields = vec![StructField::new("field_a", DataType::BOOLEAN, true)];
3389 let struct_type = StructType::new_unchecked(fields);
3390
3391 let mut iter = (&struct_type).into_iter();
3393
3394 assert!(iter.next().is_some());
3396 assert!(iter.next().is_none());
3397
3398 assert!(iter.next().is_none());
3400 assert!(iter.next().is_none());
3401 }
3402
3403 #[test]
3404 fn test_fused_double_ended_iterator_empty() {
3405 let struct_type = StructType::new_unchecked(std::iter::empty::<StructField>());
3406
3407 let mut iter = struct_type.fields();
3409
3410 assert!(iter.next().is_none());
3412 assert!(iter.next_back().is_none());
3413
3414 assert!(iter.next().is_none());
3416 assert!(iter.next_back().is_none());
3417 }
3418
3419 #[test]
3420 fn test_double_ended_iterator_single_element() {
3421 let fields = vec![StructField::new("single", DataType::STRING, true)];
3422 let struct_type = StructType::new_unchecked(fields);
3423
3424 let mut iter = struct_type.fields();
3426
3427 assert_eq!(iter.next().unwrap().name, "single");
3429 assert!(iter.next().is_none());
3430 assert!(iter.next_back().is_none());
3431
3432 let struct_type =
3434 StructType::new_unchecked([StructField::new("single2", DataType::INTEGER, false)]);
3435 let mut iter = struct_type.into_iter();
3436
3437 assert_eq!(iter.next_back().unwrap().name, "single2");
3438 assert!(iter.next().is_none());
3439 assert!(iter.next_back().is_none());
3440 }
3441
3442 #[test]
3443 fn test_metadata_column_spec() -> DeltaResult<()> {
3444 assert_eq!(MetadataColumnSpec::RowIndex.text_value(), "row_index");
3446 assert_eq!(MetadataColumnSpec::RowId.text_value(), "row_id");
3447 assert_eq!(
3448 MetadataColumnSpec::RowCommitVersion.text_value(),
3449 "row_commit_version"
3450 );
3451 assert_eq!(MetadataColumnSpec::FilePath.text_value(), "_file");
3452
3453 assert_eq!(MetadataColumnSpec::RowIndex.data_type(), DataType::LONG);
3455 assert_eq!(MetadataColumnSpec::RowId.data_type(), DataType::LONG);
3456 assert_eq!(
3457 MetadataColumnSpec::RowCommitVersion.data_type(),
3458 DataType::LONG
3459 );
3460 assert_eq!(MetadataColumnSpec::FilePath.data_type(), DataType::STRING);
3461
3462 assert!(!MetadataColumnSpec::RowIndex.nullable());
3464 assert!(!MetadataColumnSpec::RowId.nullable());
3465 assert!(!MetadataColumnSpec::RowCommitVersion.nullable());
3466 assert!(!MetadataColumnSpec::FilePath.nullable());
3467
3468 assert_eq!(MetadataColumnSpec::RowIndex.reserved_field_id(), None);
3470 assert_eq!(MetadataColumnSpec::RowId.reserved_field_id(), None);
3471 assert_eq!(
3472 MetadataColumnSpec::RowCommitVersion.reserved_field_id(),
3473 None
3474 );
3475 assert_eq!(
3476 MetadataColumnSpec::FilePath.reserved_field_id(),
3477 Some(crate::reserved_field_ids::FILE_NAME)
3478 );
3479
3480 assert_eq!(
3482 MetadataColumnSpec::from_str("row_index")?,
3483 MetadataColumnSpec::RowIndex
3484 );
3485 assert_eq!(
3486 MetadataColumnSpec::from_str("row_id")?,
3487 MetadataColumnSpec::RowId
3488 );
3489 assert_eq!(
3490 MetadataColumnSpec::from_str("row_commit_version")?,
3491 MetadataColumnSpec::RowCommitVersion
3492 );
3493 assert_eq!(
3494 MetadataColumnSpec::from_str("_file")?,
3495 MetadataColumnSpec::FilePath
3496 );
3497
3498 assert!(MetadataColumnSpec::from_str("invalid").is_err());
3500
3501 Ok(())
3502 }
3503
3504 #[test]
3505 fn test_create_metadata_column() {
3506 let field =
3507 StructField::create_metadata_column("test_row_index", MetadataColumnSpec::RowIndex);
3508
3509 assert_eq!(field.name(), "test_row_index");
3510 assert_eq!(field.data_type(), &DataType::LONG);
3511 assert!(!field.nullable);
3512 assert!(field.is_metadata_column());
3513 assert_eq!(
3514 field.get_metadata_column_spec(),
3515 Some(MetadataColumnSpec::RowIndex)
3516 );
3517 }
3518
3519 #[test]
3520 fn test_default_row_index_column() {
3521 let field = StructField::default_row_index_column();
3522
3523 assert_eq!(field.name(), "_metadata.row_index");
3524 assert_eq!(field.data_type(), &DataType::LONG);
3525 assert!(!field.nullable);
3526 assert!(field.is_metadata_column());
3527 assert_eq!(
3528 field.get_metadata_column_spec(),
3529 Some(MetadataColumnSpec::RowIndex)
3530 );
3531 }
3532
3533 #[test]
3534 fn test_add_column() -> DeltaResult<()> {
3535 let schema = StructType::try_new([StructField::nullable("col1", DataType::STRING)])?;
3536
3537 let new_field = StructField::nullable("col2", DataType::INTEGER);
3538 let updated_schema = schema.add([new_field])?;
3539
3540 assert_eq!(updated_schema.fields().count(), 2);
3541 assert!(updated_schema.contains("col1"));
3542 assert!(updated_schema.contains("col2"));
3543 Ok(())
3544 }
3545
3546 #[test]
3547 fn test_add_metadata_column() -> DeltaResult<()> {
3548 let schema = StructType::try_new([StructField::nullable("regular_col", DataType::STRING)])?;
3549
3550 let schema_with_metadata =
3551 schema.add_metadata_column("my_row_index", MetadataColumnSpec::RowIndex)?;
3552
3553 assert_eq!(schema_with_metadata.fields().count(), 2);
3554 assert!(schema_with_metadata.contains_metadata_column(&MetadataColumnSpec::RowIndex));
3555 assert!(schema_with_metadata.contains("my_row_index"));
3556 assert_eq!(
3557 schema_with_metadata.index_of_metadata_column(&MetadataColumnSpec::RowIndex),
3558 Some(&1)
3559 );
3560 Ok(())
3561 }
3562
3563 #[test]
3564 fn test_duplicate_metadata_columns() -> DeltaResult<()> {
3565 let schema = StructType::try_new([StructField::nullable("regular_col", DataType::STRING)])?;
3566
3567 let schema_with_metadata =
3568 schema.add_metadata_column("row_index1", MetadataColumnSpec::RowIndex)?;
3569
3570 let result =
3572 schema_with_metadata.add_metadata_column("row_index2", MetadataColumnSpec::RowIndex);
3573
3574 assert_result_error_with_message(result, "Duplicate metadata column");
3575 Ok(())
3576 }
3577
3578 #[test]
3579 fn test_duplicate_field_name_case_insensitive() {
3580 let result = StructType::try_new([
3582 StructField::nullable("Value", DataType::INTEGER),
3583 StructField::nullable("value", DataType::STRING),
3584 ]);
3585 assert_result_error_with_message(result, "Duplicate field name (case-insensitive)");
3586 }
3587
3588 #[test]
3589 fn test_duplicate_field_name_exact() {
3590 let result = StructType::try_new([
3592 StructField::nullable("id", DataType::INTEGER),
3593 StructField::nullable("id", DataType::STRING),
3594 ]);
3595 assert_result_error_with_message(result, "Duplicate field name (case-insensitive)");
3596 }
3597
3598 #[test]
3599 fn test_nested_metadata_columns_validation_struct() -> DeltaResult<()> {
3600 let nested_field_with_metadata =
3602 StructField::create_metadata_column("nested_row_index", MetadataColumnSpec::RowIndex);
3603 let nested_struct = StructType {
3604 type_name: "struct".into(),
3605 fields: [(
3606 nested_field_with_metadata.name.clone(),
3607 nested_field_with_metadata,
3608 )]
3609 .into_iter()
3610 .collect(),
3611 metadata_columns: HashMap::new(),
3612 };
3613
3614 let result = StructType::try_new([
3615 StructField::nullable("regular_col", DataType::STRING),
3616 StructField::nullable("nested", DataType::Struct(Box::new(nested_struct))),
3617 ]);
3618
3619 assert_result_error_with_message(result, "only allowed at the top level");
3620 Ok(())
3621 }
3622
3623 #[test]
3624 fn test_nested_metadata_columns_validation_array() -> DeltaResult<()> {
3625 let nested_field_with_metadata =
3627 StructField::create_metadata_column("nested_row_index", MetadataColumnSpec::RowIndex);
3628 let nested_struct = StructType {
3629 type_name: "struct".into(),
3630 fields: [(
3631 nested_field_with_metadata.name.clone(),
3632 nested_field_with_metadata,
3633 )]
3634 .into_iter()
3635 .collect(),
3636 metadata_columns: HashMap::new(),
3637 };
3638 let array_type = ArrayType::new(DataType::Struct(Box::new(nested_struct)), true);
3639
3640 let result = StructType::try_new([
3641 StructField::nullable("regular_col", DataType::STRING),
3642 StructField::nullable("array_col", DataType::Array(Box::new(array_type))),
3643 ]);
3644
3645 assert_result_error_with_message(result, "only allowed at the top level");
3646 Ok(())
3647 }
3648
3649 #[test]
3650 fn test_nested_metadata_columns_validation_map() -> DeltaResult<()> {
3651 let nested_field_with_metadata =
3653 StructField::create_metadata_column("nested_row_index", MetadataColumnSpec::RowIndex);
3654 let nested_struct = StructType {
3655 type_name: "struct".into(),
3656 fields: [(
3657 nested_field_with_metadata.name.clone(),
3658 nested_field_with_metadata,
3659 )]
3660 .into_iter()
3661 .collect(),
3662 metadata_columns: HashMap::new(),
3663 };
3664
3665 for map_type in [
3666 MapType::new(
3667 DataType::Struct(Box::new(nested_struct.clone())),
3668 DataType::STRING,
3669 true,
3670 ),
3671 MapType::new(
3672 DataType::STRING,
3673 DataType::Struct(Box::new(nested_struct)),
3674 true,
3675 ),
3676 ] {
3677 let result = StructType::try_new([
3678 StructField::nullable("regular_col", DataType::STRING),
3679 StructField::nullable("map_col", DataType::Map(Box::new(map_type))),
3680 ]);
3681
3682 assert_result_error_with_message(result, "only allowed at the top level");
3683 }
3684
3685 Ok(())
3686 }
3687
3688 #[test]
3689 fn test_column_identifier_trait() -> DeltaResult<()> {
3690 let schema = StructType::try_new([
3691 StructField::nullable("regular_col", DataType::STRING),
3692 StructField::create_metadata_column("row_index_col", MetadataColumnSpec::RowIndex),
3693 ])?;
3694
3695 assert!(schema.contains("regular_col"));
3697 assert!(schema.contains("row_index_col"));
3698 assert!(!schema.contains("nonexistent"));
3699
3700 assert!(schema.contains("regular_col"));
3702 assert!(schema.contains("row_index_col"));
3703
3704 assert!(schema.contains_metadata_column(&MetadataColumnSpec::RowIndex));
3706 assert!(!schema.contains_metadata_column(&MetadataColumnSpec::RowId));
3707 Ok(())
3708 }
3709
3710 #[test]
3711 fn test_metadata_column_serialization() -> DeltaResult<()> {
3712 let field = StructField::create_metadata_column("test_row_id", MetadataColumnSpec::RowId);
3713
3714 let json = serde_json::to_string(&field)?;
3716 let deserialized: StructField = serde_json::from_str(&json)?;
3717
3718 assert_eq!(deserialized.name(), field.name());
3719 assert_eq!(deserialized.data_type(), field.data_type());
3720 assert_eq!(deserialized.nullable, field.nullable);
3721 assert!(deserialized.is_metadata_column());
3722 assert_eq!(
3723 deserialized.get_metadata_column_spec(),
3724 Some(MetadataColumnSpec::RowId)
3725 );
3726 Ok(())
3727 }
3728
3729 #[test]
3730 fn test_all_metadata_column_specs() -> DeltaResult<()> {
3731 let schema = StructType::try_new([StructField::nullable("regular_col", DataType::STRING)])?;
3732
3733 let schema = schema
3734 .add_metadata_column("row_index", MetadataColumnSpec::RowIndex)?
3735 .add_metadata_column("row_id", MetadataColumnSpec::RowId)?
3736 .add_metadata_column("row_commit_version", MetadataColumnSpec::RowCommitVersion)?;
3737
3738 assert_eq!(schema.fields().count(), 4);
3739 assert!(schema.contains_metadata_column(&MetadataColumnSpec::RowIndex));
3740 assert!(schema.contains_metadata_column(&MetadataColumnSpec::RowId));
3741 assert!(schema.contains_metadata_column(&MetadataColumnSpec::RowCommitVersion));
3742
3743 assert_eq!(
3744 schema.index_of_metadata_column(&MetadataColumnSpec::RowIndex),
3745 Some(&1)
3746 );
3747 assert_eq!(
3748 schema.index_of_metadata_column(&MetadataColumnSpec::RowId),
3749 Some(&2)
3750 );
3751 assert_eq!(
3752 schema.index_of_metadata_column(&MetadataColumnSpec::RowCommitVersion),
3753 Some(&3)
3754 );
3755 Ok(())
3756 }
3757
3758 #[test]
3759 fn test_physical_name_with_mode_none() {
3760 let field_json = r#"{
3761 "name": "logical_name",
3762 "type": "string",
3763 "nullable": true,
3764 "metadata": {
3765 "delta.columnMapping.physicalName": "physical_name_col123"
3766 }
3767 }"#;
3768 let field: StructField = serde_json::from_str(field_json).unwrap();
3769
3770 assert_eq!(field.physical_name(ColumnMappingMode::None), "logical_name");
3772 }
3773
3774 #[test]
3775 fn test_physical_name_with_mode_id() {
3776 let field_json = r#"{
3777 "name": "logical_name",
3778 "type": "string",
3779 "nullable": true,
3780 "metadata": {
3781 "delta.columnMapping.id": 5,
3782 "delta.columnMapping.physicalName": "physical_name_col123"
3783 }
3784 }"#;
3785 let field: StructField = serde_json::from_str(field_json).unwrap();
3786
3787 assert_eq!(
3789 field.physical_name(ColumnMappingMode::Id),
3790 "physical_name_col123"
3791 );
3792 }
3793
3794 #[test]
3795 fn test_physical_name_with_mode_name() {
3796 let field_json = r#"{
3797 "name": "logical_name",
3798 "type": "string",
3799 "nullable": true,
3800 "metadata": {
3801 "delta.columnMapping.physicalName": "physical_name_col456"
3802 }
3803 }"#;
3804 let field: StructField = serde_json::from_str(field_json).unwrap();
3805
3806 assert_eq!(
3808 field.physical_name(ColumnMappingMode::Name),
3809 "physical_name_col456"
3810 );
3811 }
3812
3813 #[test]
3814 fn test_physical_name_fallback_id() {
3815 let field_json = r#"{
3816 "name": "logical_name",
3817 "type": "string",
3818 "nullable": true,
3819 "metadata": {}
3820 }"#;
3821 let field: StructField = serde_json::from_str(field_json).unwrap();
3822
3823 assert_eq!(field.physical_name(ColumnMappingMode::Id), "logical_name");
3825 }
3826
3827 #[test]
3828 fn test_physical_name_fallback_name() {
3829 let field_json = r#"{
3830 "name": "logical_name",
3831 "type": "string",
3832 "nullable": true,
3833 "metadata": {}
3834 }"#;
3835 let field: StructField = serde_json::from_str(field_json).unwrap();
3836
3837 assert_eq!(field.physical_name(ColumnMappingMode::Name), "logical_name");
3839 }
3840
3841 #[test]
3842 fn test_display_struct_type_stable_output() -> DeltaResult<()> {
3843 let nested_field_with_metadata =
3844 StructField::create_metadata_column("nested_row_index", MetadataColumnSpec::RowIndex);
3845 let inner_struct =
3846 StructType::new_unchecked([StructField::new("q", DataType::LONG, false)]);
3847 let nested_struct = StructType::new_unchecked([
3848 nested_field_with_metadata,
3849 StructField::new("x", DataType::DOUBLE, true),
3850 StructField::new(
3851 "inner_struct",
3852 DataType::Struct(Box::new(inner_struct)),
3853 false,
3854 ),
3855 ]);
3856 let array_type = ArrayType::new(DataType::Struct(Box::new(nested_struct.clone())), true);
3857 let map_type = MapType::new(
3858 DataType::Struct(Box::new(nested_struct.clone())),
3859 DataType::Struct(Box::new(nested_struct.clone())), true,
3861 );
3862 let fields = vec![
3863 StructField::new("x", DataType::DOUBLE, true),
3864 StructField::new("y", DataType::FLOAT, false),
3865 StructField::new("z", DataType::LONG, true),
3866 StructField::new("s", nested_struct.clone(), false),
3867 StructField::nullable("array_col", DataType::Array(Box::new(array_type))),
3868 StructField::nullable("map_col", DataType::Map(Box::new(map_type))),
3869 StructField::new("a", DataType::LONG, true),
3870 ];
3871
3872 let struct_type = StructType::new_unchecked(fields);
3873 assert_eq!(
3874 struct_type.to_string(),
3875 "struct:
3876├─x: double (is nullable: true, metadata: {})
3877├─y: float (is nullable: false, metadata: {})
3878├─z: long (is nullable: true, metadata: {})
3879├─s: struct<nested_row_index: long, x: double, inner_struct: struct<q: long>> (is nullable: false, metadata: {})
3880│ ├─nested_row_index: long (is nullable: false, metadata: {delta.metadataSpec: String(\"row_index\")})
3881│ ├─x: double (is nullable: true, metadata: {})
3882│ └─inner_struct: struct<q: long> (is nullable: false, metadata: {})
3883│ └─q: long (is nullable: false, metadata: {})
3884├─array_col: array<struct<nested_row_index: long, x: double, inner_struct: struct<q: long>>> (is nullable: true, metadata: {})
3885│ └─array_element: struct<nested_row_index: long, x: double, inner_struct: struct<q: long>>
3886│ ├─nested_row_index: long (is nullable: false, metadata: {delta.metadataSpec: String(\"row_index\")})
3887│ ├─x: double (is nullable: true, metadata: {})
3888│ └─inner_struct: struct<q: long> (is nullable: false, metadata: {})
3889│ └─q: long (is nullable: false, metadata: {})
3890├─map_col: map<struct<nested_row_index: long, x: double, inner_struct: struct<q: long>>, struct<nested_row_index: long, x: double, inner_struct: struct<q: long>>> (is nullable: true, metadata: {})
3891│ ├─map_key: struct<nested_row_index: long, x: double, inner_struct: struct<q: long>>
3892│ │ ├─nested_row_index: long (is nullable: false, metadata: {delta.metadataSpec: String(\"row_index\")})
3893│ │ ├─x: double (is nullable: true, metadata: {})
3894│ │ └─inner_struct: struct<q: long> (is nullable: false, metadata: {})
3895│ │ └─q: long (is nullable: false, metadata: {})
3896│ └─map_value: struct<nested_row_index: long, x: double, inner_struct: struct<q: long>>
3897│ ├─nested_row_index: long (is nullable: false, metadata: {delta.metadataSpec: String(\"row_index\")})
3898│ ├─x: double (is nullable: true, metadata: {})
3899│ └─inner_struct: struct<q: long> (is nullable: false, metadata: {})
3900│ └─q: long (is nullable: false, metadata: {})
3901└─a: long (is nullable: true, metadata: {})
3902"
3903 );
3904
3905 let schema = StructType::try_new([StructField::nullable("regular_col", DataType::STRING)])?;
3906 let schema = schema
3907 .add_metadata_column("row_index", MetadataColumnSpec::RowIndex)?
3908 .add_metadata_column("row_id", MetadataColumnSpec::RowId)?
3909 .add_metadata_column("row_commit_version", MetadataColumnSpec::RowCommitVersion)?;
3910 assert_eq!(schema.to_string(), "struct:
3911├─regular_col: string (is nullable: true, metadata: {})
3912├─row_index: long (is nullable: false, metadata: {delta.metadataSpec: String(\"row_index\")})
3913├─row_id: long (is nullable: false, metadata: {delta.metadataSpec: String(\"row_id\")})
3914└─row_commit_version: long (is nullable: false, metadata: {delta.metadataSpec: String(\"row_commit_version\")})
3915");
3916 Ok(())
3917 }
3918
3919 #[test]
3920 fn test_builder_empty() {
3921 let schema = StructType::builder().build().unwrap();
3922 assert_eq!(schema.num_fields(), 0)
3923 }
3924
3925 #[test]
3926 fn test_builder_add_fields() {
3927 let schema = StructType::builder()
3928 .add_field(StructField::new("id", DataType::INTEGER, false))
3929 .add_field(StructField::new("name", DataType::STRING, true))
3930 .build()
3931 .unwrap();
3932
3933 assert_eq!(schema.num_fields(), 2);
3934 assert_eq!(schema.field_at_index(0).unwrap().name(), "id");
3935 assert_eq!(schema.field_at_index(1).unwrap().name(), "name");
3936 }
3937
3938 #[test]
3939 fn test_builder_from_schema() {
3940 let base_schema =
3941 StructType::try_new([StructField::new("id", DataType::INTEGER, false)]).unwrap();
3942
3943 let extended_schema = StructTypeBuilder::from_schema(&base_schema)
3944 .add_field(StructField::new("name", DataType::STRING, true))
3945 .build()
3946 .unwrap();
3947
3948 assert_eq!(extended_schema.num_fields(), 2);
3949 assert_eq!(extended_schema.field_at_index(0).unwrap().name(), "id");
3950 assert_eq!(extended_schema.field_at_index(1).unwrap().name(), "name");
3951 }
3952
3953 #[test]
3954 fn test_parquet_field_id_key_value() {
3955 assert_eq!(
3959 ColumnMetadataKey::ParquetFieldId.as_ref(),
3960 "parquet.field.id"
3961 );
3962 }
3963
3964 #[test]
3965 fn test_with_field_inserted_empty_struct() {
3966 let schema = StructType::try_new([]).unwrap();
3967 let schema = schema
3968 .with_field_inserted_after(None, StructField::new("age", DataType::STRING, true))
3969 .expect("with field inserted should produce a valid schema");
3970 assert_eq!(schema.num_fields(), 1);
3971 assert_eq!(schema.field_at_index(0).unwrap().name(), "age");
3972 }
3973
3974 #[test]
3975 fn test_with_field_inserted() {
3976 let schema = StructType::try_new([
3977 StructField::new("id", DataType::INTEGER, false),
3978 StructField::new("name", DataType::STRING, true),
3979 ])
3980 .unwrap();
3981 let schema = schema
3982 .with_field_inserted_after(Some("id"), StructField::new("age", DataType::STRING, true))
3983 .expect("with field inserted should produce a valid schema");
3984 assert_eq!(schema.num_fields(), 3);
3985 assert_eq!(schema.field_at_index(0).unwrap().name(), "id");
3986 assert_eq!(schema.field_at_index(1).unwrap().name(), "age");
3987 assert_eq!(schema.field_at_index(2).unwrap().name(), "name");
3988 }
3989
3990 #[test]
3991 fn test_with_field_inserted_append_to_end() {
3992 let schema = StructType::try_new([
3993 StructField::new("id", DataType::INTEGER, false),
3994 StructField::new("name", DataType::STRING, true),
3995 ])
3996 .unwrap();
3997 let schema = schema
3998 .with_field_inserted_after(None, StructField::new("age", DataType::STRING, true))
3999 .expect("with field inserted should produce a valid schema");
4000
4001 assert_eq!(schema.num_fields(), 3);
4002 assert_eq!(schema.field_at_index(0).unwrap().name(), "id");
4003 assert_eq!(schema.field_at_index(1).unwrap().name(), "name");
4004 assert_eq!(schema.field_at_index(2).unwrap().name(), "age");
4005 }
4006
4007 #[test]
4008 fn test_with_field_inserted_after_non_existent_field() {
4009 let schema =
4010 StructType::try_new([StructField::new("id", DataType::INTEGER, false)]).unwrap();
4011 let new_schema = schema.with_field_inserted_after(
4012 Some("nonexistent"),
4013 StructField::new("name", DataType::STRING, true),
4014 );
4015 assert!(new_schema.is_err());
4016 }
4017
4018 #[test]
4019 fn test_with_field_inserted_after_duplicate_field() {
4020 let schema = StructType::try_new([
4021 StructField::new("id", DataType::INTEGER, false),
4022 StructField::new("name", DataType::STRING, true),
4023 ])
4024 .unwrap();
4025 let new_schema = schema.with_field_inserted_after(
4026 Some("name"),
4027 StructField::new("id", DataType::STRING, true),
4028 );
4029 assert!(new_schema.is_err());
4030 assert_result_error_with_message(new_schema, "Field id already exists");
4031 }
4032
4033 #[test]
4034 fn test_with_field_inserted_before() {
4035 let schema = StructType::try_new([
4036 StructField::new("id", DataType::INTEGER, false),
4037 StructField::new("name", DataType::STRING, true),
4038 ])
4039 .unwrap();
4040 let schema = schema
4041 .with_field_inserted_before(
4042 Some("name"),
4043 StructField::new("age", DataType::STRING, true),
4044 )
4045 .expect("with field inserted before should produce a valid schema");
4046 assert_eq!(schema.num_fields(), 3);
4047 assert_eq!(schema.field_at_index(0).unwrap().name(), "id");
4048 assert_eq!(schema.field_at_index(1).unwrap().name(), "age");
4049 assert_eq!(schema.field_at_index(2).unwrap().name(), "name");
4050 }
4051
4052 #[test]
4053 fn test_with_field_inserted_before_duplicate_field() {
4054 let schema = StructType::try_new([
4055 StructField::new("id", DataType::INTEGER, false),
4056 StructField::new("name", DataType::STRING, true),
4057 ])
4058 .unwrap();
4059 let new_schema = schema.with_field_inserted_before(
4060 Some("name"),
4061 StructField::new("id", DataType::STRING, true),
4062 );
4063 assert!(new_schema.is_err());
4064 assert_result_error_with_message(new_schema, "Field id already exists");
4065 }
4066
4067 #[test]
4068 fn test_with_field_inserted_before_at_beginning() {
4069 let schema = StructType::try_new([
4070 StructField::new("id", DataType::INTEGER, false),
4071 StructField::new("name", DataType::STRING, true),
4072 ])
4073 .unwrap();
4074 let schema = schema
4075 .with_field_inserted_before(None, StructField::new("age", DataType::STRING, true))
4076 .expect("with field inserted before should produce a valid schema");
4077 assert_eq!(schema.num_fields(), 3);
4078 assert_eq!(schema.field_at_index(0).unwrap().name(), "age");
4079 assert_eq!(schema.field_at_index(1).unwrap().name(), "id");
4080 assert_eq!(schema.field_at_index(2).unwrap().name(), "name");
4081 }
4082
4083 #[test]
4084 fn test_with_field_inserted_before_non_existent_field() {
4085 let schema =
4086 StructType::try_new([StructField::new("id", DataType::INTEGER, false)]).unwrap();
4087 let new_schema = schema.with_field_inserted_before(
4088 Some("nonexistent"),
4089 StructField::new("name", DataType::STRING, true),
4090 );
4091 assert!(new_schema.is_err());
4092 }
4093
4094 #[test]
4095 fn test_with_field_inserted_before_empty_struct() {
4096 let schema = StructType::try_new([]).unwrap();
4097 let schema = schema
4098 .with_field_inserted_before(None, StructField::new("age", DataType::STRING, true))
4099 .expect("with field inserted before on empty struct should succeed");
4100 assert_eq!(schema.num_fields(), 1);
4101 assert_eq!(schema.field_at_index(0).unwrap().name(), "age");
4102 }
4103
4104 #[test]
4105 fn test_with_field_removed() {
4106 let schema =
4107 StructType::try_new([StructField::new("id", DataType::INTEGER, false)]).unwrap();
4108 let new_schema = schema.with_field_removed("id");
4109 assert_eq!(new_schema.num_fields(), 0);
4110 }
4111
4112 #[test]
4113 fn test_with_field_removed_non_existent_field() {
4114 let schema =
4115 StructType::try_new([StructField::new("id", DataType::INTEGER, false)]).unwrap();
4116 let new_schema = schema.with_field_removed("nonexistent");
4117 assert_eq!(new_schema.num_fields(), 1);
4118 assert_eq!(new_schema.field_at_index(0).unwrap().name(), "id");
4119 }
4120
4121 #[test]
4122 fn test_with_field_replaced() {
4123 let schema =
4124 StructType::try_new([StructField::new("id", DataType::INTEGER, false)]).unwrap();
4125 let new_schema = schema
4126 .with_field_replaced("id", StructField::new("name", DataType::STRING, true))
4127 .unwrap();
4128
4129 assert_eq!(new_schema.num_fields(), 1);
4130 assert_eq!(new_schema.field_at_index(0).unwrap().name(), "name");
4131 }
4132
4133 #[test]
4134 fn test_with_field_replaced_non_existent_field() {
4135 let schema =
4136 StructType::try_new([StructField::new("id", DataType::INTEGER, false)]).unwrap();
4137 let new_schema = schema.with_field_replaced(
4138 "nonexistent",
4139 StructField::new("name", DataType::STRING, true),
4140 );
4141 assert!(new_schema.is_err(), "Expected error for non-existent field");
4142 }
4143
4144 fn walk_test_schema() -> StructType {
4146 let l3 = StructType::new_unchecked([StructField::new("c", DataType::DOUBLE, false)]);
4147 let l2 = StructType::new_unchecked([StructField::new(
4148 "b",
4149 DataType::Struct(Box::new(l3)),
4150 false,
4151 )]);
4152 StructType::new_unchecked([StructField::new("a", DataType::Struct(Box::new(l2)), false)])
4153 }
4154
4155 #[rstest::rstest]
4156 #[case::single_level(vec!["a"], vec!["a"], DataType::Struct(Box::new(
4157 StructType::new_unchecked([StructField::new("b", DataType::Struct(Box::new(
4158 StructType::new_unchecked([StructField::new("c", DataType::DOUBLE, false)])
4159 )), false)])
4160 )))]
4161 #[case::nested_2(vec!["a", "b"], vec!["a", "b"], DataType::Struct(Box::new(
4162 StructType::new_unchecked([StructField::new("c", DataType::DOUBLE, false)])
4163 )))]
4164 #[case::nested_3(vec!["a", "b", "c"], vec!["a", "b", "c"], DataType::DOUBLE)]
4165 #[test]
4166 fn test_walk_column_fields_happy(
4167 #[case] col_path: Vec<&str>,
4168 #[case] expected_names: Vec<&str>,
4169 #[case] expected_leaf_type: DataType,
4170 ) {
4171 let schema = walk_test_schema();
4172 let fields = schema
4173 .walk_column_fields(&ColumnName::new(col_path.iter().copied()))
4174 .unwrap();
4175 assert_eq!(fields.len(), expected_names.len());
4176 for (field, name) in fields.iter().zip(expected_names.iter()) {
4177 assert_eq!(field.name(), *name);
4178 }
4179 assert_eq!(fields.last().unwrap().data_type(), &expected_leaf_type);
4180 }
4181
4182 #[rstest::rstest]
4183 #[case::empty_path(vec![], "Column path cannot be empty")]
4184 #[case::not_found_top(vec!["x"], "not found in schema")]
4185 #[case::not_found_nested(vec!["a", "x"], "not found in schema")]
4186 #[case::intermediate_not_struct(vec!["a", "b", "c", "d"], "not a struct type")]
4187 #[test]
4188 fn test_walk_column_fields_error(#[case] col_path: Vec<&str>, #[case] expected_error: &str) {
4189 let schema = walk_test_schema();
4190 let result = schema.walk_column_fields(&ColumnName::new(col_path.iter().copied()));
4191 assert_result_error_with_message(result, expected_error);
4192 }
4193
4194 #[test]
4195 fn test_normalize_column_names_to_schema_casing() {
4196 let inner =
4197 StructType::new_unchecked(vec![StructField::new("City", DataType::STRING, false)]);
4198 let schema = StructType::new_unchecked(vec![
4199 StructField::new("id", DataType::INTEGER, false),
4200 StructField::new("EventDate", DataType::DATE, false),
4201 StructField::new("Address", DataType::Struct(Box::new(inner)), false),
4202 ]);
4203
4204 let cols = vec![ColumnName::new(["eventdate"])];
4206 assert_eq!(
4207 normalize_column_names_to_schema_casing(&schema, &cols)[0].path(),
4208 ["EventDate"]
4209 );
4210
4211 let cols = vec![ColumnName::new(["address", "city"])];
4213 assert_eq!(
4214 normalize_column_names_to_schema_casing(&schema, &cols)[0].path(),
4215 ["Address", "City"]
4216 );
4217
4218 let cols = vec![ColumnName::new(["id"])];
4220 assert_eq!(
4221 normalize_column_names_to_schema_casing(&schema, &cols)[0].path(),
4222 ["id"]
4223 );
4224
4225 let cols = vec![ColumnName::new(["nonexistent"])];
4227 assert_eq!(
4228 normalize_column_names_to_schema_casing(&schema, &cols)[0].path(),
4229 ["nonexistent"]
4230 );
4231 }
4232}