1use std::borrow::Cow;
4use std::collections::{HashMap, HashSet};
5use std::fmt::{Debug, Display, Formatter};
6use std::iter::{DoubleEndedIterator, FusedIterator};
7use std::str::FromStr;
8use std::sync::{Arc, LazyLock};
9
10use delta_kernel_derive::internal_api;
11use indexmap::IndexMap;
12use itertools::Itertools;
13use serde::{Deserialize, Serialize};
14use tracing::warn;
15
16pub(crate) use crate::expressions::{column_name, ColumnName};
18use crate::reserved_field_ids::FILE_NAME;
19use crate::table_features::{
20 validate_and_extract_column_mapping_annotations, validate_column_mapping_id, ColumnMappingMode,
21 SeenColumnMappingAnnotations,
22};
23use crate::transforms::{transform_output_type, SchemaTransform};
24use crate::utils::require;
25use crate::{DeltaResult, Error};
26
27pub(crate) mod compare;
28#[cfg(feature = "schema-diff")]
29pub(crate) mod diff;
30
31#[cfg(feature = "internal-api")]
32pub mod derive_macro_utils;
33#[cfg(not(feature = "internal-api"))]
34pub(crate) mod derive_macro_utils;
35pub(crate) mod validation;
36pub(crate) mod variant_utils;
37
38pub type Schema = StructType;
39pub type SchemaRef = Arc<StructType>;
40
41#[internal_api]
44pub(crate) trait ToSchema {
45 fn to_schema() -> StructType;
46}
47
48#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Eq)]
49#[serde(untagged)]
50pub enum MetadataValue {
51 Number(i64),
52 String(String),
53 Boolean(bool),
54 Other(serde_json::Value),
59}
60
61impl Display for MetadataValue {
62 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
63 match self {
64 MetadataValue::Number(n) => write!(f, "{n}"),
65 MetadataValue::String(s) => write!(f, "{s}"),
66 MetadataValue::Boolean(b) => write!(f, "{b}"),
67 MetadataValue::Other(v) => write!(f, "{v}"), }
69 }
70}
71
72impl From<String> for MetadataValue {
73 fn from(value: String) -> Self {
74 Self::String(value)
75 }
76}
77
78impl From<&String> for MetadataValue {
79 fn from(value: &String) -> Self {
80 Self::String(value.clone())
81 }
82}
83
84impl From<&str> for MetadataValue {
85 fn from(value: &str) -> Self {
86 Self::String(value.to_string())
87 }
88}
89
90impl From<i64> for MetadataValue {
91 fn from(value: i64) -> Self {
92 Self::Number(value)
93 }
94}
95
96impl From<bool> for MetadataValue {
97 fn from(value: bool) -> Self {
98 Self::Boolean(value)
99 }
100}
101
102#[derive(Debug)]
103pub enum ColumnMetadataKey {
104 ColumnMappingId,
105 ColumnMappingPhysicalName,
106 ColumnMappingNestedIds,
125 ParquetFieldId,
126 ParquetFieldNestedIds,
127 GenerationExpression,
128 IdentityStart,
129 IdentityStep,
130 IdentityHighWaterMark,
131 IdentityAllowExplicitInsert,
132 InternalColumn,
133 Invariants,
134 MetadataSpec,
135}
136
137impl AsRef<str> for ColumnMetadataKey {
138 fn as_ref(&self) -> &str {
139 match self {
140 Self::ColumnMappingId => "delta.columnMapping.id",
141 Self::ColumnMappingPhysicalName => "delta.columnMapping.physicalName",
142 Self::ColumnMappingNestedIds => "delta.columnMapping.nested.ids",
143 Self::ParquetFieldId => "parquet.field.id",
147 Self::ParquetFieldNestedIds => "parquet.field.nested.ids",
152 Self::GenerationExpression => "delta.generationExpression",
153 Self::IdentityAllowExplicitInsert => "delta.identity.allowExplicitInsert",
154 Self::IdentityHighWaterMark => "delta.identity.highWaterMark",
155 Self::IdentityStart => "delta.identity.start",
156 Self::IdentityStep => "delta.identity.step",
157 Self::InternalColumn => "delta.isInternalColumn",
158 Self::Invariants => "delta.invariants",
159 Self::MetadataSpec => "delta.metadataSpec",
160 }
161 }
162}
163
164#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
168pub enum MetadataColumnSpec {
169 RowIndex,
170 RowId,
171 RowCommitVersion,
172 FilePath,
173}
174
175impl MetadataColumnSpec {
176 pub fn text_value(&self) -> &'static str {
178 match self {
179 Self::RowIndex => "row_index",
180 Self::RowId => "row_id",
181 Self::RowCommitVersion => "row_commit_version",
182 Self::FilePath => "_file",
183 }
184 }
185
186 pub fn data_type(&self) -> DataType {
188 match self {
189 Self::RowIndex => DataType::LONG,
190 Self::RowId => DataType::LONG,
191 Self::RowCommitVersion => DataType::LONG,
192 Self::FilePath => DataType::STRING,
193 }
194 }
195
196 pub fn nullable(&self) -> bool {
198 match self {
199 Self::RowIndex => false,
200 Self::RowId => false,
201 Self::RowCommitVersion => false,
202 Self::FilePath => false,
203 }
204 }
205
206 pub fn reserved_field_id(&self) -> Option<i64> {
208 match self {
209 Self::FilePath => Some(FILE_NAME),
210 _ => None,
211 }
212 }
213}
214
215impl FromStr for MetadataColumnSpec {
216 type Err = Error;
217
218 fn from_str(s: &str) -> Result<Self, Self::Err> {
219 match s {
220 "row_index" => Ok(Self::RowIndex),
221 "row_id" => Ok(Self::RowId),
222 "row_commit_version" => Ok(Self::RowCommitVersion),
223 "_file" => Ok(Self::FilePath),
224 _ => Err(Error::Schema(format!("Unknown metadata column spec: {s}"))),
225 }
226 }
227}
228
229#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Eq)]
230pub struct StructField {
231 pub name: String,
233 #[serde(rename = "type")]
235 pub data_type: DataType,
236 pub nullable: bool,
238 pub metadata: HashMap<String, MetadataValue>,
240}
241
242#[derive(Debug, Clone, Copy)]
247pub(crate) struct ExistingColumnMappingAnnotations<'a> {
248 pub id: Option<i64>,
250 pub physical_name: Option<&'a str>,
252}
253
254impl StructField {
255 const DEFAULT_ROW_INDEX_COLUMN_NAME: &'static str = "_metadata.row_index";
260
261 pub fn new(name: impl Into<String>, data_type: impl Into<DataType>, nullable: bool) -> Self {
267 Self {
268 name: name.into(),
269 data_type: data_type.into(),
270 nullable,
271 metadata: HashMap::default(),
272 }
273 }
274
275 pub fn nullable(name: impl Into<String>, data_type: impl Into<DataType>) -> Self {
277 Self::new(name, data_type, true)
278 }
279
280 pub fn not_null(name: impl Into<String>, data_type: impl Into<DataType>) -> Self {
282 Self::new(name, data_type, false)
283 }
284
285 pub fn create_metadata_column(name: impl Into<String>, spec: MetadataColumnSpec) -> Self {
287 let mut metadata = HashMap::new();
288 metadata.insert(
289 ColumnMetadataKey::MetadataSpec.as_ref().to_string(),
290 MetadataValue::String(spec.text_value().to_string()),
291 );
292
293 Self {
294 name: name.into(),
295 data_type: spec.data_type(),
296 nullable: spec.nullable(),
297 metadata,
298 }
299 }
300
301 pub fn default_row_index_column() -> &'static StructField {
303 static DEFAULT_ROW_INDEX_COLUMN: LazyLock<StructField> = LazyLock::new(|| {
304 StructField::create_metadata_column(
305 StructField::DEFAULT_ROW_INDEX_COLUMN_NAME,
306 MetadataColumnSpec::RowIndex,
307 )
308 });
309 &DEFAULT_ROW_INDEX_COLUMN
310 }
311
312 pub fn with_metadata(
318 mut self,
319 metadata: impl IntoIterator<Item = (impl Into<String>, impl Into<MetadataValue>)>,
320 ) -> Self {
321 self.metadata = metadata
322 .into_iter()
323 .map(|(k, v)| (k.into(), v.into()))
324 .collect();
325 self
326 }
327
328 pub fn add_metadata(
330 mut self,
331 metadata: impl IntoIterator<Item = (impl Into<String>, impl Into<MetadataValue>)>,
332 ) -> Self {
333 self.metadata
334 .extend(metadata.into_iter().map(|(k, v)| (k.into(), v.into())));
335 self
336 }
337
338 pub fn is_metadata_column(&self) -> bool {
340 self.metadata
341 .contains_key(ColumnMetadataKey::MetadataSpec.as_ref())
342 }
343
344 pub fn get_metadata_column_spec(&self) -> Option<MetadataColumnSpec> {
346 match self
347 .metadata
348 .get(ColumnMetadataKey::MetadataSpec.as_ref())?
349 {
350 MetadataValue::String(s) => MetadataColumnSpec::from_str(s).ok(),
351 _ => None,
352 }
353 }
354
355 pub fn is_internal_column(&self) -> bool {
359 matches!(
360 self.metadata
361 .get(ColumnMetadataKey::InternalColumn.as_ref()),
362 Some(MetadataValue::Boolean(true))
363 )
364 }
365
366 pub fn as_internal_column(self) -> Self {
368 self.add_metadata(vec![(
369 ColumnMetadataKey::InternalColumn.as_ref().to_string(),
370 MetadataValue::Boolean(true),
371 )])
372 }
373
374 pub fn get_config_value(&self, key: &ColumnMetadataKey) -> Option<&MetadataValue> {
375 self.metadata.get(key.as_ref())
376 }
377
378 pub fn column_mapping_id(&self) -> Option<i64> {
381 match self.get_config_value(&ColumnMetadataKey::ColumnMappingId)? {
382 MetadataValue::Number(n) => Some(*n),
383 _ => None,
384 }
385 }
386
387 pub(crate) fn validate_and_extract_existing_column_mapping_annotations(
408 &self,
409 ) -> DeltaResult<ExistingColumnMappingAnnotations<'_>> {
410 let id = match self.get_config_value(&ColumnMetadataKey::ColumnMappingId) {
411 Some(MetadataValue::Number(n)) => {
412 validate_column_mapping_id(*n)
413 .map_err(|e| Error::schema(format!("Field '{}': {e}", self.name)))?;
414 Some(*n)
415 }
416 None => None,
417 Some(_) => {
418 return Err(Error::schema(format!(
419 "Field '{}' has a non-numeric `{}` annotation",
420 self.name,
421 ColumnMetadataKey::ColumnMappingId.as_ref(),
422 )));
423 }
424 };
425 let physical_name =
426 match self.get_config_value(&ColumnMetadataKey::ColumnMappingPhysicalName) {
427 Some(MetadataValue::String(s)) if s.is_empty() => {
428 return Err(Error::schema(format!(
429 "Field '{}' has an empty `{}` annotation",
430 self.name,
431 ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(),
432 )));
433 }
434 Some(MetadataValue::String(s)) => Some(s.as_str()),
435 None => None,
436 Some(_) => {
437 return Err(Error::schema(format!(
438 "Field '{}' has a non-string `{}` annotation",
439 self.name,
440 ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(),
441 )));
442 }
443 };
444 Ok(ExistingColumnMappingAnnotations { id, physical_name })
445 }
446
447 #[cfg(any(test, feature = "test-utils"))]
451 pub fn collect_column_mapping_ids(&self) -> Vec<i64> {
452 struct CollectIds(Vec<i64>);
453 impl<'a> SchemaTransform<'a> for CollectIds {
454 transform_output_type!(|'a, T| ());
455
456 fn transform_struct_field(&mut self, field: &'a StructField) {
457 if let Some(id) = field.column_mapping_id() {
458 self.0.push(id);
459 }
460 self.recurse_into_struct_field(field)
461 }
462 }
463 let mut visitor = CollectIds(Vec::new());
464 visitor.transform_struct_field(self);
465 visitor.0
466 }
467
468 #[internal_api]
478 pub(crate) fn physical_name(&self, column_mapping_mode: ColumnMappingMode) -> &str {
479 match column_mapping_mode {
480 ColumnMappingMode::None => &self.name,
481 ColumnMappingMode::Id | ColumnMappingMode::Name => {
482 match self
483 .metadata
484 .get(ColumnMetadataKey::ColumnMappingPhysicalName.as_ref())
485 {
486 Some(MetadataValue::String(physical_name)) => physical_name,
487 _ => &self.name,
488 }
489 }
490 }
491 }
492
493 pub(crate) fn has_physical_name_annotation(&self) -> bool {
496 matches!(
497 self.metadata
498 .get(ColumnMetadataKey::ColumnMappingPhysicalName.as_ref()),
499 Some(MetadataValue::String(_))
500 )
501 }
502
503 pub(crate) fn has_id_annotation(&self) -> bool {
506 matches!(
507 self.metadata
508 .get(ColumnMetadataKey::ColumnMappingId.as_ref()),
509 Some(MetadataValue::Number(_))
510 )
511 }
512
513 pub fn with_name(&self, new_name: impl Into<String>) -> Self {
516 StructField {
517 name: new_name.into(),
518 data_type: self.data_type().clone(),
519 nullable: self.nullable,
520 metadata: self.metadata.clone(),
521 }
522 }
523
524 #[inline]
525 pub fn name(&self) -> &String {
526 &self.name
527 }
528
529 #[inline]
530 pub fn is_nullable(&self) -> bool {
531 self.nullable
532 }
533
534 #[inline]
535 pub const fn data_type(&self) -> &DataType {
536 &self.data_type
537 }
538
539 #[inline]
540 pub const fn metadata(&self) -> &HashMap<String, MetadataValue> {
541 &self.metadata
542 }
543
544 pub fn metadata_with_string_values(&self) -> HashMap<String, String> {
547 self.metadata
548 .iter()
549 .map(|(key, val)| (key.clone(), val.to_string()))
550 .collect()
551 }
552
553 #[internal_api]
573 pub(crate) fn make_physical(
574 &self,
575 column_mapping_mode: ColumnMappingMode,
576 ) -> DeltaResult<Self> {
577 MakePhysical::new(column_mapping_mode)
578 .transform_struct_field(self)
579 .map(|f| f.into_owned())
580 }
581
582 pub(crate) fn has_invariants(&self) -> bool {
583 self.metadata
584 .contains_key(ColumnMetadataKey::Invariants.as_ref())
585 }
586
587 fn logical_to_physical_metadata(
596 &self,
597 column_mapping_mode: ColumnMappingMode,
598 ) -> HashMap<String, MetadataValue> {
599 let mut base_metadata = self.metadata.clone();
600 let physical_name_key = ColumnMetadataKey::ColumnMappingPhysicalName.as_ref();
601 let field_id_key = ColumnMetadataKey::ColumnMappingId.as_ref();
602 let parquet_field_id_key = ColumnMetadataKey::ParquetFieldId.as_ref();
603 let field_id = base_metadata.get(ColumnMetadataKey::ColumnMappingId.as_ref());
604 match column_mapping_mode {
605 ColumnMappingMode::Id => {
606 let Some(MetadataValue::Number(fid)) = field_id else {
607 warn!("StructField with name {} is missing field id in the Id column mapping mode", self.name());
610 debug_assert!(false);
611 return base_metadata;
612 };
613 base_metadata.insert(
615 parquet_field_id_key.to_string(),
616 MetadataValue::Number(*fid),
617 );
618 debug_assert!(base_metadata.contains_key(physical_name_key));
620 }
621 ColumnMappingMode::Name => {
622 debug_assert!(base_metadata.contains_key(physical_name_key));
624 debug_assert!(base_metadata.contains_key(field_id_key));
625
626 let Some(MetadataValue::Number(fid)) = field_id else {
630 warn!("StructField with name {} is missing field id in the Name column mapping mode", self.name());
631 debug_assert!(false);
632 return base_metadata;
633 };
634 base_metadata.insert(
635 parquet_field_id_key.to_string(),
636 MetadataValue::Number(*fid),
637 );
638 }
640 ColumnMappingMode::None => {
641 base_metadata.remove(physical_name_key);
642 base_metadata.remove(field_id_key);
643 base_metadata.remove(parquet_field_id_key);
644 }
646 }
647 base_metadata
648 }
649}
650
651impl Display for StructField {
652 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
653 let mut metadata_str = String::from("{");
654 let mut first = true;
655 for (k, v) in self.metadata.iter() {
656 if !first {
657 metadata_str.push_str(", ");
658 }
659 first = false;
660 metadata_str.push_str(&format!("{k}: {v:?}"));
661 }
662 metadata_str.push('}');
663 write!(
664 f,
665 "{}: {} (is nullable: {}, metadata: {})",
666 self.name, self.data_type, self.nullable, metadata_str,
667 )
668 }
669}
670
671#[derive(Debug, PartialEq, Clone, Eq)]
674pub struct StructType {
675 type_name: String,
676 fields: IndexMap<String, StructField>,
681 metadata_columns: HashMap<MetadataColumnSpec, usize>,
685}
686
687pub struct StructTypeBuilder {
688 fields: IndexMap<String, StructField>,
689}
690
691impl Default for StructTypeBuilder {
692 fn default() -> Self {
693 Self::new()
694 }
695}
696
697impl StructTypeBuilder {
698 pub fn new() -> Self {
699 Self {
700 fields: IndexMap::new(),
701 }
702 }
703
704 pub fn from_schema(schema: &StructType) -> Self {
705 Self {
706 fields: schema.fields.clone(),
707 }
708 }
709
710 pub fn add_field(mut self, field: StructField) -> Self {
711 self.fields.insert(field.name.clone(), field);
712 self
713 }
714
715 pub fn build(self) -> DeltaResult<StructType> {
716 StructType::try_new(self.fields.into_values())
717 }
718
719 pub fn build_arc_unchecked(self) -> Arc<StructType> {
720 Arc::new(StructType::new_unchecked(self.fields.into_values()))
721 }
722}
723
724impl StructType {
725 pub fn try_new(fields: impl IntoIterator<Item = StructField>) -> DeltaResult<Self> {
733 let mut field_map = IndexMap::new();
734 let mut metadata_columns = HashMap::new();
735 let mut seen_lowercase_names = HashSet::new();
736
737 for (i, field) in fields.into_iter().enumerate() {
739 if !matches!(field.data_type, DataType::Primitive(_)) {
741 Self::ensure_no_metadata_columns_in_field(&field)?;
742 }
743
744 if let Some(metadata_column_spec) = field.get_metadata_column_spec() {
746 if metadata_columns.insert(metadata_column_spec, i).is_some() {
747 return Err(Error::schema(format!(
748 "Duplicate metadata column: {metadata_column_spec:?}",
749 )));
750 }
751 }
752
753 let key = field.name.to_lowercase();
756 if !seen_lowercase_names.insert(key) {
757 return Err(Error::schema(format!(
758 "Duplicate field name (case-insensitive): '{}'",
759 field.name
760 )));
761 }
762
763 field_map.insert(field.name.clone(), field);
764 }
765
766 Ok(Self {
767 type_name: "struct".into(),
768 fields: field_map,
769 metadata_columns,
770 })
771 }
772
773 pub fn try_from_results<E: Into<Error>>(
778 fields: impl IntoIterator<Item = Result<StructField, E>>,
779 ) -> DeltaResult<Self> {
780 fields
781 .into_iter()
782 .map(|result| result.map_err(Into::into))
783 .process_results(|iter| Self::try_new(iter))?
784 }
785
786 pub fn builder() -> StructTypeBuilder {
787 StructTypeBuilder::new()
788 }
789
790 #[internal_api]
795 pub(crate) fn new_unchecked(fields: impl IntoIterator<Item = StructField>) -> Self {
796 let mut field_map = IndexMap::new();
797 let mut metadata_columns = HashMap::new();
798
799 for (i, field) in fields.into_iter().enumerate() {
800 if let Some(metadata_column_spec) = field.get_metadata_column_spec() {
801 metadata_columns.insert(metadata_column_spec, i);
802 }
803 field_map.insert(field.name.clone(), field);
804 }
805
806 Self {
807 type_name: "struct".into(),
808 fields: field_map,
809 metadata_columns,
810 }
811 }
812
813 pub fn project_as_struct(&self, names: &[impl AsRef<str>]) -> DeltaResult<StructType> {
817 let fields = names.iter().map(|name| {
818 self.fields
819 .get(name.as_ref())
820 .cloned()
821 .ok_or_else(|| Error::missing_column(name.as_ref()))
822 });
823 Self::try_from_results(fields)
824 }
825
826 pub fn project(&self, names: &[impl AsRef<str>]) -> DeltaResult<SchemaRef> {
830 let struct_type = self.project_as_struct(names)?;
831 Ok(Arc::new(struct_type))
832 }
833
834 pub fn add(&self, fields: impl IntoIterator<Item = StructField>) -> DeltaResult<Self> {
836 Self::try_new(self.fields.values().cloned().chain(fields))
837 }
838
839 pub fn add_metadata_column(
841 &self,
842 name: impl Into<String>,
843 spec: MetadataColumnSpec,
844 ) -> DeltaResult<Self> {
845 self.add([StructField::create_metadata_column(name, spec)])
846 }
847
848 pub fn index_of(&self, name: impl AsRef<str>) -> Option<usize> {
850 self.fields.get_index_of(name.as_ref())
851 }
852
853 pub fn index_of_metadata_column(&self, spec: &MetadataColumnSpec) -> Option<&usize> {
855 self.metadata_columns.get(spec)
856 }
857
858 pub fn contains(&self, name: impl AsRef<str>) -> bool {
860 self.fields.contains_key(name.as_ref())
861 }
862
863 pub fn contains_metadata_column(&self, spec: &MetadataColumnSpec) -> bool {
865 self.metadata_columns.contains_key(spec)
866 }
867
868 pub fn field(&self, name: impl AsRef<str>) -> Option<&StructField> {
870 self.fields.get(name.as_ref())
871 }
872
873 #[internal_api]
882 pub(crate) fn walk_column_fields<'a>(
883 &'a self,
884 col: &ColumnName,
885 ) -> DeltaResult<Vec<&'a StructField>> {
886 self.walk_column_fields_by(col, |s, name| s.field(name))
887 }
888
889 pub(crate) fn walk_column_fields_by<'a, F>(
895 &'a self,
896 col: &ColumnName,
897 find_field: F,
898 ) -> DeltaResult<Vec<&'a StructField>>
899 where
900 F: for<'b> Fn(&'b StructType, &str) -> Option<&'b StructField>,
901 {
902 let path = col.path();
903 if path.is_empty() {
904 return Err(Error::generic("Column path cannot be empty"));
905 }
906 let mut current_struct = self;
907 let mut fields = Vec::with_capacity(path.len());
908 for (i, field_name) in path.iter().enumerate() {
909 let field = find_field(current_struct, field_name).ok_or_else(|| {
910 Error::generic(format!(
911 "Could not resolve column '{col}': field '{field_name}' not found in schema"
912 ))
913 })?;
914 fields.push(field);
915 if i < path.len() - 1 {
916 let DataType::Struct(inner) = field.data_type() else {
917 return Err(Error::generic(format!(
918 "Cannot resolve column '{col}': intermediate field '{field_name}' \
919 is not a struct type"
920 )));
921 };
922 current_struct = inner;
923 }
924 }
925 Ok(fields)
926 }
927
928 pub fn field_with_index(&self, name: impl AsRef<str>) -> Option<(usize, &StructField)> {
930 self.fields
931 .get_full(name.as_ref())
932 .map(|(index, _, field)| (index, field))
933 }
934
935 pub fn field_at_index(&self, index: usize) -> Option<&StructField> {
937 self.fields.get_index(index).map(|(_, field)| field)
938 }
939
940 pub fn fields(
942 &self,
943 ) -> impl ExactSizeIterator<Item = &StructField> + DoubleEndedIterator + FusedIterator {
944 self.fields.values()
945 }
946
947 pub fn into_fields(
949 self,
950 ) -> impl ExactSizeIterator<Item = StructField> + DoubleEndedIterator + FusedIterator {
951 self.fields.into_values()
952 }
953
954 pub(crate) fn field_map_mut(&mut self) -> &mut IndexMap<String, StructField> {
956 &mut self.fields
957 }
958
959 #[cfg(any(test, feature = "test-utils"))]
982 #[allow(clippy::panic, clippy::expect_used)]
983 pub fn field_at_path<'a>(&'a self, path: &[String]) -> &'a StructField {
984 fn find_ci<'a>(
985 mut fields: impl Iterator<Item = &'a StructField>,
986 name: &str,
987 ) -> &'a StructField {
988 let lowered = name.to_lowercase();
989 fields
990 .find(|f| f.name().to_lowercase() == lowered)
991 .unwrap_or_else(|| panic!("field '{name}' not found"))
992 }
993 let (first, rest) = path.split_first().expect("non-empty path");
994 let mut field = find_ci(self.fields(), first);
995 for seg in rest {
996 let DataType::Struct(s) = field.data_type() else {
997 panic!("expected struct at intermediate segment '{seg}'");
998 };
999 field = find_ci(s.fields(), seg);
1000 }
1001 field
1002 }
1003
1004 pub fn field_names(&self) -> impl ExactSizeIterator<Item = &String> {
1006 self.fields.keys()
1007 }
1008
1009 pub fn num_fields(&self) -> usize {
1011 self.fields.len()
1013 }
1014
1015 #[allow(unused)] #[internal_api]
1023 pub(crate) fn total_struct_fields(&self) -> usize {
1024 fn count_data_type(dt: &DataType) -> usize {
1025 match dt {
1026 DataType::Struct(inner) => inner.total_struct_fields(),
1027 DataType::Array(array) => count_data_type(array.element_type()),
1028 DataType::Map(map) => {
1029 count_data_type(map.key_type()) + count_data_type(map.value_type())
1030 }
1031 _ => 0,
1032 }
1033 }
1034 self.fields()
1035 .map(|field| 1 + count_data_type(field.data_type()))
1036 .sum()
1037 }
1038
1039 pub fn metadata_column(&self, spec: &MetadataColumnSpec) -> Option<&StructField> {
1041 self.metadata_columns
1042 .get(spec)
1043 .and_then(|index| self.fields.get_index(*index).map(|(_, field)| field))
1044 }
1045
1046 pub fn metadata_columns(&self) -> impl Iterator<Item = &StructField> {
1048 self.metadata_columns
1049 .values()
1050 .filter_map(|index| self.fields.get_index(*index).map(|(_, field)| field))
1051 }
1052
1053 #[allow(unused)]
1060 #[internal_api]
1061 pub(crate) fn leaves<'s>(&self, own_name: impl Into<Option<&'s str>>) -> ColumnNamesAndTypes {
1062 let mut get_leaves = GetSchemaLeaves::new(own_name.into());
1063 get_leaves.transform_struct(self);
1064 (get_leaves.names, get_leaves.types).into()
1065 }
1066
1067 #[internal_api]
1074 pub(crate) fn make_physical(
1075 &self,
1076 column_mapping_mode: ColumnMappingMode,
1077 ) -> DeltaResult<Self> {
1078 let mut transformer = MakePhysical::new(column_mapping_mode);
1079 transformer.transform_struct(self).map(|s| s.into_owned())
1080 }
1081
1082 pub(crate) fn ensure_no_metadata_columns(
1084 fields: &mut dyn Iterator<Item = &StructField>,
1085 ) -> DeltaResult<()> {
1086 for field in fields {
1087 Self::ensure_no_metadata_columns_in_field(field)?;
1088 }
1089 Ok(())
1090 }
1091
1092 pub(crate) fn ensure_no_metadata_columns_in_field(field: &StructField) -> DeltaResult<()> {
1094 if field.is_metadata_column() {
1095 return Err(Error::schema(
1096 "Metadata columns are only allowed at the top level of a schema".to_string(),
1097 ));
1098 }
1099
1100 match &field.data_type {
1101 DataType::Struct(struct_type) => {
1102 Self::ensure_no_metadata_columns(&mut struct_type.fields().filter(|f| {
1104 !matches!(f.data_type, DataType::Struct(_) | DataType::Variant(_))
1105 }))?;
1106 }
1107 DataType::Array(array_type) => {
1108 if let DataType::Struct(struct_type) = array_type.element_type() {
1109 Self::ensure_no_metadata_columns(&mut struct_type.fields())?;
1110 }
1111 }
1112 DataType::Map(map_type) => {
1113 if let DataType::Struct(struct_type) = map_type.key_type() {
1114 Self::ensure_no_metadata_columns(&mut struct_type.fields())?;
1115 }
1116 if let DataType::Struct(struct_type) = map_type.value_type() {
1117 Self::ensure_no_metadata_columns(&mut struct_type.fields())?;
1118 }
1119 }
1120 DataType::Primitive(_) | DataType::Variant(_) => {}
1123 };
1124
1125 Ok(())
1126 }
1127
1128 pub fn with_field_inserted_after(
1133 mut self,
1134 after: Option<&str>,
1135 new_field: StructField,
1136 ) -> DeltaResult<Self> {
1137 if self.fields.contains_key(&new_field.name) {
1142 return Err(Error::generic(format!(
1143 "Field {} already exists",
1144 new_field.name
1145 )));
1146 }
1147
1148 let insert_index = after
1149 .map(|after| {
1150 self.fields
1151 .get_index_of(after)
1152 .map(|index| index + 1)
1153 .ok_or_else(|| Error::generic(format!("Field {after} not found")))
1154 })
1155 .unwrap_or_else(|| Ok(self.fields.len()))?;
1156
1157 self.fields
1158 .insert_before(insert_index, new_field.name.clone(), new_field);
1159 Ok(self)
1160 }
1161
1162 pub fn with_field_inserted_before(
1167 mut self,
1168 before: Option<&str>,
1169 new_field: StructField,
1170 ) -> DeltaResult<Self> {
1171 if self.fields.contains_key(&new_field.name) {
1175 return Err(Error::generic(format!(
1176 "Field {} already exists",
1177 new_field.name
1178 )));
1179 }
1180
1181 let index_of_before = before
1182 .map(|before| {
1183 self.fields
1184 .get_index_of(before)
1185 .ok_or_else(|| Error::generic(format!("Field {before} not found")))
1186 })
1187 .unwrap_or_else(|| Ok(0))?;
1188
1189 self.fields
1190 .insert_before(index_of_before, new_field.name.clone(), new_field);
1191 Ok(self)
1192 }
1193
1194 pub fn with_field_removed(mut self, name: &str) -> Self {
1197 self.fields.shift_remove(name);
1198 self
1199 }
1200
1201 pub fn with_fields_filtered(
1204 &self,
1205 predicate: impl Fn(&StructField) -> bool,
1206 ) -> DeltaResult<Self> {
1207 Self::try_new(self.fields().filter(|f| predicate(f)).cloned())
1208 }
1209
1210 pub fn with_fields_filtered_nonempty(
1216 &self,
1217 predicate: impl Fn(&StructField) -> bool,
1218 ) -> DeltaResult<Option<Self>> {
1219 let filtered = self.with_fields_filtered(predicate)?;
1220 if filtered.num_fields() == 0 {
1221 Ok(None)
1222 } else {
1223 Ok(Some(filtered))
1224 }
1225 }
1226
1227 pub fn with_field_replaced(
1230 mut self,
1231 name: &str,
1232 new_field: StructField,
1233 ) -> DeltaResult<StructType> {
1234 let replace_field = self
1235 .fields
1236 .get_mut(name)
1237 .ok_or_else(|| Error::generic(format!("Field {name} not found")))?;
1238
1239 *replace_field = new_field;
1240 Ok(self)
1241 }
1242}
1243
1244fn write_indent(f: &mut Formatter<'_>, levels: &[bool]) -> std::fmt::Result {
1245 let mut it = levels.iter().peekable();
1246
1247 while let Some(is_last) = it.next() {
1248 if it.peek().is_none() {
1250 write!(f, "{}", if *is_last { "└─" } else { "├─" })?;
1251 }
1252 else {
1254 write!(f, "{}", if *is_last { " " } else { "│ " })?;
1255 }
1256 }
1257
1258 Ok(())
1259}
1260
1261fn write_struct_type(
1262 st: &StructType,
1263 f: &mut Formatter<'_>,
1264 levels: &mut Vec<bool>,
1265) -> std::fmt::Result {
1266 let len = st.fields.len();
1267
1268 for (i, (_, field)) in st.fields.iter().enumerate() {
1269 let is_last = i + 1 == len;
1270 levels.push(is_last);
1271
1272 write_indent(f, levels)?;
1273 writeln!(f, "{field}")?;
1274
1275 field.data_type.fmt_recursive(f, levels)?;
1276
1277 levels.pop();
1278 }
1279 Ok(())
1280}
1281
1282impl Display for StructType {
1283 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1284 writeln!(f, "{}:", self.type_name)?;
1285 let mut levels = Vec::new();
1286 write_struct_type(self, f, &mut levels)
1287 }
1288}
1289
1290impl IntoIterator for StructType {
1291 type Item = StructField;
1292 type IntoIter = StructFieldIntoIter;
1293
1294 fn into_iter(self) -> Self::IntoIter {
1295 StructFieldIntoIter {
1296 inner: self.fields.into_values(),
1297 }
1298 }
1299}
1300
1301impl<'a> IntoIterator for &'a StructType {
1302 type Item = &'a StructField;
1303 type IntoIter = StructFieldRefIter<'a>;
1304
1305 fn into_iter(self) -> Self::IntoIter {
1306 StructFieldRefIter {
1307 inner: self.fields.values(),
1308 }
1309 }
1310}
1311
1312#[derive(Debug)]
1340pub struct StructFieldIntoIter {
1341 inner: indexmap::map::IntoValues<String, StructField>,
1342}
1343
1344impl Iterator for StructFieldIntoIter {
1345 type Item = StructField;
1346
1347 fn next(&mut self) -> Option<Self::Item> {
1348 self.inner.next()
1349 }
1350
1351 fn size_hint(&self) -> (usize, Option<usize>) {
1352 self.inner.size_hint()
1353 }
1354
1355 fn count(self) -> usize {
1356 self.inner.count()
1357 }
1358
1359 fn last(self) -> Option<Self::Item> {
1360 self.inner.last()
1361 }
1362
1363 fn nth(&mut self, n: usize) -> Option<Self::Item> {
1364 self.inner.nth(n)
1365 }
1366}
1367
1368impl ExactSizeIterator for StructFieldIntoIter {
1369 fn len(&self) -> usize {
1370 self.inner.len()
1371 }
1372}
1373
1374impl FusedIterator for StructFieldIntoIter {}
1375
1376impl DoubleEndedIterator for StructFieldIntoIter {
1377 fn next_back(&mut self) -> Option<Self::Item> {
1378 self.inner.next_back()
1379 }
1380}
1381
1382#[derive(Debug, Clone)]
1423pub struct StructFieldRefIter<'a> {
1424 inner: indexmap::map::Values<'a, String, StructField>,
1425}
1426
1427impl<'a> Iterator for StructFieldRefIter<'a> {
1428 type Item = &'a StructField;
1429
1430 fn next(&mut self) -> Option<Self::Item> {
1431 self.inner.next()
1432 }
1433
1434 fn size_hint(&self) -> (usize, Option<usize>) {
1435 self.inner.size_hint()
1436 }
1437}
1438
1439impl ExactSizeIterator for StructFieldRefIter<'_> {
1440 fn len(&self) -> usize {
1441 self.inner.len()
1442 }
1443}
1444
1445impl FusedIterator for StructFieldRefIter<'_> {}
1446
1447impl DoubleEndedIterator for StructFieldRefIter<'_> {
1448 fn next_back(&mut self) -> Option<Self::Item> {
1449 self.inner.next_back()
1450 }
1451}
1452
1453struct InvariantChecker;
1454
1455impl<'a> SchemaTransform<'a> for InvariantChecker {
1456 transform_output_type!(|'a, T| Result<(), ()>);
1457
1458 fn transform_struct_field(&mut self, field: &'a StructField) -> Result<(), ()> {
1459 if field.has_invariants() {
1460 Err(())
1461 } else {
1462 self.recurse_into_struct_field(field)
1463 }
1464 }
1465}
1466
1467pub(crate) fn schema_has_invariants(schema: &Schema) -> bool {
1472 InvariantChecker.transform_struct(schema).is_err()
1473}
1474
1475struct NonNullFieldChecker;
1478
1479impl<'a> SchemaTransform<'a> for NonNullFieldChecker {
1480 transform_output_type!(|'a, T| Result<(), ()>);
1481
1482 fn transform_variant(&mut self, _stype: &'a StructType) -> Result<(), ()> {
1486 Ok(())
1487 }
1488
1489 fn transform_struct_field(&mut self, field: &'a StructField) -> Result<(), ()> {
1490 if !field.is_nullable() {
1491 return Err(());
1492 }
1493
1494 self.recurse_into_struct_field(field)
1495 }
1496}
1497
1498pub(crate) fn schema_contains_non_null_fields(schema: &Schema) -> bool {
1503 NonNullFieldChecker.transform_struct(schema).is_err()
1504}
1505
1506pub(crate) fn normalize_column_names_to_schema_casing(
1518 schema: &StructType,
1519 columns: &[ColumnName],
1520) -> Vec<ColumnName> {
1521 columns
1522 .iter()
1523 .map(|col| {
1524 let path = col.path();
1525 let mut normalized: Vec<String> = Vec::with_capacity(path.len());
1526 let mut current_schema = schema;
1527 for (i, field_name) in path.iter().enumerate() {
1528 match current_schema
1529 .fields()
1530 .find(|f| f.name().eq_ignore_ascii_case(field_name))
1531 {
1532 Some(f) => {
1533 normalized.push(f.name().to_string());
1534 if let DataType::Struct(inner) = f.data_type() {
1535 current_schema = inner;
1536 }
1537 }
1538 None => {
1539 normalized.extend(path[i..].iter().cloned());
1542 break;
1543 }
1544 }
1545 }
1546 ColumnName::new(normalized.iter().map(|s| s.as_str()))
1547 })
1548 .collect()
1549}
1550
1551#[internal_api]
1553#[derive(Clone, Default)]
1554pub(crate) struct ColumnNamesAndTypes(Vec<ColumnName>, Vec<DataType>);
1555impl ColumnNamesAndTypes {
1556 #[internal_api]
1557 pub(crate) fn as_ref(&self) -> (&[ColumnName], &[DataType]) {
1558 (&self.0, &self.1)
1559 }
1560}
1561
1562impl From<(Vec<ColumnName>, Vec<DataType>)> for ColumnNamesAndTypes {
1563 fn from((names, fields): (Vec<ColumnName>, Vec<DataType>)) -> Self {
1564 ColumnNamesAndTypes(names, fields)
1565 }
1566}
1567
1568#[derive(Debug, Deserialize, Serialize)]
1569#[serde(rename_all = "camelCase")]
1570struct StructTypeSerDeHelper {
1571 #[serde(rename = "type")]
1572 type_name: String,
1573 fields: Vec<StructField>,
1574}
1575
1576impl Serialize for StructType {
1577 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1578 where
1579 S: serde::Serializer,
1580 {
1581 StructTypeSerDeHelper {
1582 type_name: self.type_name.clone(),
1583 fields: self.fields.values().cloned().collect(),
1584 }
1585 .serialize(serializer)
1586 }
1587}
1588
1589impl<'de> Deserialize<'de> for StructType {
1590 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1591 where
1592 D: serde::Deserializer<'de>,
1593 Self: Sized,
1594 {
1595 let helper = StructTypeSerDeHelper::deserialize(deserializer)?;
1596 StructType::try_new(helper.fields).map_err(serde::de::Error::custom)
1597 }
1598}
1599
1600#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Eq)]
1601#[serde(rename_all = "camelCase")]
1602pub struct ArrayType {
1603 #[serde(rename = "type")]
1604 pub type_name: String,
1605 pub element_type: DataType,
1607 pub contains_null: bool,
1609}
1610
1611impl ArrayType {
1612 pub fn new(element_type: DataType, contains_null: bool) -> Self {
1613 Self {
1614 type_name: "array".into(),
1615 element_type,
1616 contains_null,
1617 }
1618 }
1619
1620 #[inline]
1621 pub const fn element_type(&self) -> &DataType {
1622 &self.element_type
1623 }
1624
1625 #[inline]
1626 pub const fn contains_null(&self) -> bool {
1627 self.contains_null
1628 }
1629}
1630
1631#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Eq)]
1632#[serde(rename_all = "camelCase")]
1633pub struct MapType {
1634 #[serde(rename = "type")]
1635 pub type_name: String,
1636 pub key_type: DataType,
1638 pub value_type: DataType,
1640 #[serde(default = "default_true")]
1642 pub value_contains_null: bool,
1643}
1644
1645impl MapType {
1646 pub fn new(
1647 key_type: impl Into<DataType>,
1648 value_type: impl Into<DataType>,
1649 value_contains_null: bool,
1650 ) -> Self {
1651 Self {
1652 type_name: "map".into(),
1653 key_type: key_type.into(),
1654 value_type: value_type.into(),
1655 value_contains_null,
1656 }
1657 }
1658
1659 #[inline]
1660 pub const fn key_type(&self) -> &DataType {
1661 &self.key_type
1662 }
1663
1664 #[inline]
1665 pub const fn value_type(&self) -> &DataType {
1666 &self.value_type
1667 }
1668
1669 #[inline]
1670 pub const fn value_contains_null(&self) -> bool {
1671 self.value_contains_null
1672 }
1673
1674 pub fn as_struct_schema(&self, key_name: String, val_name: String) -> Schema {
1677 StructType::new_unchecked([
1678 StructField::not_null(key_name, self.key_type.clone()),
1679 StructField::new(val_name, self.value_type.clone(), self.value_contains_null),
1680 ])
1681 }
1682}
1683
1684fn default_true() -> bool {
1685 true
1686}
1687
1688#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
1689pub struct DecimalType {
1690 precision: u8,
1691 scale: u8,
1692}
1693
1694impl DecimalType {
1695 pub fn try_new(precision: u8, scale: u8) -> DeltaResult<Self> {
1697 require!(
1698 0 < precision && precision <= 38,
1699 Error::invalid_decimal(format!(
1700 "precision must be in range 1..38 inclusive, found: {precision}."
1701 ))
1702 );
1703 require!(
1704 scale <= precision,
1705 Error::invalid_decimal(format!(
1706 "scale must be in range 0..{precision} inclusive, found: {scale}."
1707 ))
1708 );
1709 Ok(Self { precision, scale })
1710 }
1711
1712 pub fn precision(&self) -> u8 {
1713 self.precision
1714 }
1715
1716 pub fn scale(&self) -> u8 {
1717 self.scale
1718 }
1719}
1720
1721#[derive(Debug, Serialize, PartialEq, Clone, Eq)]
1722#[serde(rename_all = "camelCase")]
1723pub enum PrimitiveType {
1724 String,
1726 Long,
1728 Integer,
1730 Short,
1732 Byte,
1734 Float,
1736 Double,
1738 Boolean,
1740 Binary,
1741 Date,
1742 Timestamp,
1744 #[serde(rename = "timestamp_ntz")]
1745 TimestampNtz,
1746 #[cfg(feature = "nanosecond-timestamps")]
1747 #[serde(rename = "timestamp_nanos")]
1748 TimestampNanos,
1749 #[serde(serialize_with = "serialize_decimal", untagged)]
1750 Decimal(DecimalType),
1751}
1752
1753impl PrimitiveType {
1754 pub fn decimal(precision: u8, scale: u8) -> DeltaResult<Self> {
1755 Ok(DecimalType::try_new(precision, scale)?.into())
1756 }
1757
1758 #[internal_api]
1767 pub(crate) fn can_widen_to(&self, target: &Self) -> bool {
1768 use PrimitiveType::*;
1769 matches!(
1770 (self, target),
1771 (Byte, Short | Integer | Long)
1773 | (Short, Integer | Long)
1774 | (Integer, Long)
1775 | (Float, Double)
1777 | (Timestamp, TimestampNtz)
1781 | (TimestampNtz, Timestamp)
1782 )
1783 }
1784
1785 pub(crate) fn is_checkpoint_cast_compatible(&self, target: &Self) -> bool {
1798 matches!(
1799 (self, target),
1800 (Self::Integer, Self::Date) | (Self::Long, Self::Timestamp | Self::TimestampNtz)
1801 )
1802 }
1803
1804 pub(crate) fn is_stats_type_compatible_with(&self, target: &Self) -> bool {
1814 self == target || self.can_widen_to(target) || self.is_checkpoint_cast_compatible(target)
1815 }
1816}
1817
1818fn serialize_decimal<S: serde::Serializer>(
1819 dtype: &DecimalType,
1820 serializer: S,
1821) -> Result<S::Ok, S::Error> {
1822 serializer.serialize_str(&format!("decimal({},{})", dtype.precision(), dtype.scale()))
1823}
1824
1825fn serialize_variant<S: serde::Serializer>(
1826 _: &StructType,
1827 serializer: S,
1828) -> Result<S::Ok, S::Error> {
1829 serializer.serialize_str("variant")
1830}
1831
1832impl<'de> serde::Deserialize<'de> for PrimitiveType {
1836 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1837 where
1838 D: serde::Deserializer<'de>,
1839 {
1840 let str_value = String::deserialize(deserializer)?;
1841
1842 match str_value.as_str() {
1843 "string" => Ok(PrimitiveType::String),
1844 "long" => Ok(PrimitiveType::Long),
1845 "integer" => Ok(PrimitiveType::Integer),
1846 "short" => Ok(PrimitiveType::Short),
1847 "byte" => Ok(PrimitiveType::Byte),
1848 "float" => Ok(PrimitiveType::Float),
1849 "double" => Ok(PrimitiveType::Double),
1850 "boolean" => Ok(PrimitiveType::Boolean),
1851 "binary" => Ok(PrimitiveType::Binary),
1852 "date" => Ok(PrimitiveType::Date),
1853 "timestamp" => Ok(PrimitiveType::Timestamp),
1854 "timestamp_ntz" => Ok(PrimitiveType::TimestampNtz),
1855 #[cfg(feature = "nanosecond-timestamps")]
1856 "timestamp_nanos" => Ok(PrimitiveType::TimestampNanos),
1857 decimal_str if decimal_str.starts_with("decimal(") && decimal_str.ends_with(')') => {
1858 let mut parts = decimal_str[8..decimal_str.len() - 1].split(',');
1860 let precision = parts
1861 .next()
1862 .and_then(|part| part.trim().parse::<u8>().ok())
1863 .ok_or_else(|| {
1864 serde::de::Error::custom(format!(
1865 "Invalid precision in decimal: {decimal_str}"
1866 ))
1867 })?;
1868 let scale = parts
1869 .next()
1870 .and_then(|part| part.trim().parse::<u8>().ok())
1871 .ok_or_else(|| {
1872 serde::de::Error::custom(format!("Invalid scale in decimal: {decimal_str}"))
1873 })?;
1874 if parts.next().is_some() {
1876 return Err(serde::de::Error::custom(format!(
1877 "Invalid decimal format (expected 2 parts): {decimal_str}"
1878 )));
1879 }
1880 DecimalType::try_new(precision, scale)
1881 .map(PrimitiveType::Decimal)
1882 .map_err(serde::de::Error::custom)
1883 }
1884 unsupported => Err(serde::de::Error::custom(format!(
1885 "Unsupported Delta table type: '{unsupported}'"
1886 ))),
1887 }
1888 }
1889}
1890
1891impl Display for PrimitiveType {
1892 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1893 match self {
1894 PrimitiveType::String => write!(f, "string"),
1895 PrimitiveType::Long => write!(f, "long"),
1896 PrimitiveType::Integer => write!(f, "integer"),
1897 PrimitiveType::Short => write!(f, "short"),
1898 PrimitiveType::Byte => write!(f, "byte"),
1899 PrimitiveType::Float => write!(f, "float"),
1900 PrimitiveType::Double => write!(f, "double"),
1901 PrimitiveType::Boolean => write!(f, "boolean"),
1902 PrimitiveType::Binary => write!(f, "binary"),
1903 PrimitiveType::Date => write!(f, "date"),
1904 PrimitiveType::Timestamp => write!(f, "timestamp"),
1905 PrimitiveType::TimestampNtz => write!(f, "timestamp_ntz"),
1906 #[cfg(feature = "nanosecond-timestamps")]
1907 PrimitiveType::TimestampNanos => write!(f, "timestamp_nanos"),
1908 PrimitiveType::Decimal(dtype) => {
1909 write!(f, "decimal({},{})", dtype.precision(), dtype.scale())
1910 }
1911 }
1912 }
1913}
1914
1915#[derive(Debug, Serialize, PartialEq, Clone, Eq)]
1916#[serde(untagged, rename_all = "camelCase")]
1917pub enum DataType {
1918 Primitive(PrimitiveType),
1920 Array(Box<ArrayType>),
1922 Struct(Box<StructType>),
1925 Map(Box<MapType>),
1928 #[serde(serialize_with = "serialize_variant")]
1931 Variant(Box<StructType>),
1932}
1933
1934impl From<DecimalType> for PrimitiveType {
1935 fn from(dtype: DecimalType) -> Self {
1936 PrimitiveType::Decimal(dtype)
1937 }
1938}
1939impl From<DecimalType> for DataType {
1940 fn from(dtype: DecimalType) -> Self {
1941 PrimitiveType::from(dtype).into()
1942 }
1943}
1944impl From<PrimitiveType> for DataType {
1945 fn from(ptype: PrimitiveType) -> Self {
1946 DataType::Primitive(ptype)
1947 }
1948}
1949impl From<MapType> for DataType {
1950 fn from(map_type: MapType) -> Self {
1951 DataType::Map(Box::new(map_type))
1952 }
1953}
1954
1955impl From<StructType> for DataType {
1956 fn from(struct_type: StructType) -> Self {
1957 DataType::Struct(Box::new(struct_type))
1958 }
1959}
1960
1961impl From<ArrayType> for DataType {
1962 fn from(array_type: ArrayType) -> Self {
1963 DataType::Array(Box::new(array_type))
1964 }
1965}
1966
1967impl From<SchemaRef> for DataType {
1968 fn from(schema: SchemaRef) -> Self {
1969 Arc::unwrap_or_clone(schema).into()
1970 }
1971}
1972
1973impl<'de> serde::Deserialize<'de> for DataType {
1978 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1979 where
1980 D: serde::Deserializer<'de>,
1981 {
1982 use serde::de::Error;
1983 use serde_json::Value;
1984
1985 let value = Value::deserialize(deserializer)?;
1986
1987 if let Value::String(s) = &value {
1989 if s == "variant" {
1990 return match DataType::unshredded_variant() {
1991 DataType::Variant(st) => Ok(DataType::Variant(st)),
1992 _ => Err(Error::custom("Failed to create variant type")),
1993 };
1994 }
1995
1996 return PrimitiveType::deserialize(value.clone())
1998 .map(DataType::Primitive)
1999 .map_err(|e| Error::custom(e.to_string()));
2000 }
2001
2002 if let Value::Object(map) = &value {
2004 if let Some(Value::String(type_str)) = map.get("type") {
2005 return match type_str.as_str() {
2006 "array" => ArrayType::deserialize(value)
2007 .map(|at| DataType::Array(Box::new(at)))
2008 .map_err(|e| Error::custom(e.to_string())),
2009 "struct" => StructType::deserialize(value)
2010 .map(|st| DataType::Struct(Box::new(st)))
2011 .map_err(|e| Error::custom(e.to_string())),
2012 "map" => MapType::deserialize(value)
2013 .map(|mt| DataType::Map(Box::new(mt)))
2014 .map_err(|e| Error::custom(e.to_string())),
2015 _ => Err(Error::custom(format!("Unknown complex type: '{type_str}'"))),
2016 };
2017 }
2018 }
2019
2020 Err(Error::custom(format!(
2022 "Invalid data type: {}",
2023 serde_json::to_string(&value).unwrap_or_else(|_| format!("{value:?}"))
2024 )))
2025 }
2026}
2027
2028impl DataType {
2030 pub const STRING: Self = DataType::Primitive(PrimitiveType::String);
2031 pub const LONG: Self = DataType::Primitive(PrimitiveType::Long);
2032 pub const INTEGER: Self = DataType::Primitive(PrimitiveType::Integer);
2033 pub const SHORT: Self = DataType::Primitive(PrimitiveType::Short);
2034 pub const BYTE: Self = DataType::Primitive(PrimitiveType::Byte);
2035 pub const FLOAT: Self = DataType::Primitive(PrimitiveType::Float);
2036 pub const DOUBLE: Self = DataType::Primitive(PrimitiveType::Double);
2037 pub const BOOLEAN: Self = DataType::Primitive(PrimitiveType::Boolean);
2038 pub const BINARY: Self = DataType::Primitive(PrimitiveType::Binary);
2039 pub const DATE: Self = DataType::Primitive(PrimitiveType::Date);
2040 pub const TIMESTAMP: Self = DataType::Primitive(PrimitiveType::Timestamp);
2041 pub const TIMESTAMP_NTZ: Self = DataType::Primitive(PrimitiveType::TimestampNtz);
2042 #[cfg(feature = "nanosecond-timestamps")]
2043 pub const TIMESTAMP_NANOS: Self = DataType::Primitive(PrimitiveType::TimestampNanos);
2044 pub fn decimal(precision: u8, scale: u8) -> DeltaResult<Self> {
2046 Ok(PrimitiveType::decimal(precision, scale)?.into())
2047 }
2048
2049 pub fn try_struct_type(fields: impl IntoIterator<Item = StructField>) -> DeltaResult<Self> {
2051 Ok(StructType::try_new(fields)?.into())
2052 }
2053
2054 pub fn try_struct_type_from_results<E: Into<Error>>(
2056 fields: impl IntoIterator<Item = Result<StructField, E>>,
2057 ) -> DeltaResult<Self> {
2058 StructType::try_from_results(fields).map(Self::from)
2059 }
2060
2061 pub(crate) fn struct_type_unchecked(fields: impl IntoIterator<Item = StructField>) -> Self {
2063 StructType::new_unchecked(fields).into()
2064 }
2065
2066 pub fn unshredded_variant() -> Self {
2069 DataType::Variant(Box::new(StructType::new_unchecked([
2070 StructField::not_null("metadata", DataType::BINARY),
2071 StructField::not_null("value", DataType::BINARY),
2072 ])))
2073 }
2074
2075 pub fn variant_type(fields: impl IntoIterator<Item = StructField>) -> DeltaResult<Self> {
2078 Ok(DataType::Variant(Box::new(StructType::try_from_results(
2081 fields.into_iter().map(|field| {
2082 if field.is_metadata_column() {
2083 Err(Error::schema(
2084 "Metadata columns are not allowed in Variant types".to_string(),
2085 ))
2086 } else {
2087 Ok(field)
2088 }
2089 }),
2090 )?)))
2091 }
2092
2093 pub fn as_primitive_opt(&self) -> Option<&PrimitiveType> {
2096 match self {
2097 DataType::Primitive(ptype) => Some(ptype),
2098 _ => None,
2099 }
2100 }
2101
2102 fn fmt_recursive(&self, f: &mut Formatter<'_>, levels: &mut Vec<bool>) -> std::fmt::Result {
2103 match self {
2104 DataType::Struct(inner) => write_struct_type(inner, f, levels),
2105
2106 DataType::Array(inner) => {
2107 levels.push(true); write_indent(f, levels)?;
2109 writeln!(f, "array_element: {}", inner.element_type)?;
2110 inner.element_type.fmt_recursive(f, levels)?;
2111 levels.pop();
2112 Ok(())
2113 }
2114
2115 DataType::Map(inner) => {
2116 levels.push(false); write_indent(f, levels)?;
2119 writeln!(f, "map_key: {}", inner.key_type)?;
2120 inner.key_type.fmt_recursive(f, levels)?;
2121 levels.pop();
2122
2123 levels.push(true); write_indent(f, levels)?;
2126 writeln!(f, "map_value: {}", inner.value_type)?;
2127 inner.value_type.fmt_recursive(f, levels)?;
2128 levels.pop();
2129 Ok(())
2130 }
2131
2132 _ => Ok(()),
2133 }
2134 }
2135}
2136
2137impl Display for DataType {
2138 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2139 match self {
2140 DataType::Primitive(p) => write!(f, "{p}"),
2141 DataType::Array(a) => write!(f, "array<{}>", a.element_type),
2142 DataType::Struct(s) => {
2143 write!(f, "struct<")?;
2144 for (i, field) in s.fields().enumerate() {
2145 if i > 0 {
2146 write!(f, ", ")?;
2147 }
2148 write!(f, "{}: {}", field.name, field.data_type)?;
2149 }
2150 write!(f, ">")
2151 }
2152 DataType::Map(m) => write!(f, "map<{}, {}>", m.key_type, m.value_type),
2153 DataType::Variant(_) => write!(f, "variant"),
2154 }
2155 }
2156}
2157
2158struct GetSchemaLeaves {
2159 path: Vec<String>,
2160 names: Vec<ColumnName>,
2161 types: Vec<DataType>,
2162}
2163impl GetSchemaLeaves {
2164 fn new(own_name: Option<&str>) -> Self {
2165 Self {
2166 path: own_name.into_iter().map(|s| s.to_string()).collect(),
2167 names: vec![],
2168 types: vec![],
2169 }
2170 }
2171}
2172
2173impl<'a> SchemaTransform<'a> for GetSchemaLeaves {
2174 transform_output_type!(|'a, T| ());
2175
2176 fn transform_struct_field(&mut self, field: &'a StructField) {
2177 self.path.push(field.name.clone());
2178 if let DataType::Struct(_) = field.data_type {
2179 self.recurse_into_struct_field(field);
2180 } else {
2181 self.names.push(ColumnName::new(&self.path));
2182 self.types.push(field.data_type.clone());
2183 }
2184 self.path.pop();
2185 }
2186}
2187
2188struct MakePhysical<'a> {
2189 column_mapping_mode: ColumnMappingMode,
2190 path: Vec<&'a str>,
2191 seen: SeenColumnMappingAnnotations<'a>,
2195}
2196impl<'a> MakePhysical<'a> {
2197 fn new(column_mapping_mode: ColumnMappingMode) -> Self {
2198 Self {
2199 column_mapping_mode,
2200 path: vec![],
2201 seen: SeenColumnMappingAnnotations::default(),
2202 }
2203 }
2204
2205 fn transform_inner<T>(
2206 &mut self,
2207 field_name: &'a str,
2208 transform: impl FnOnce(&mut Self) -> DeltaResult<T>,
2209 ) -> DeltaResult<T> {
2210 self.path.push(field_name);
2211 let result = transform(self);
2212 self.path.pop();
2213 result
2214 }
2215}
2216impl<'a> SchemaTransform<'a> for MakePhysical<'a> {
2217 transform_output_type!(|'a, T| DeltaResult<Cow<'a, T>>);
2218
2219 fn transform_array_element(&mut self, etype: &'a DataType) -> DeltaResult<Cow<'a, DataType>> {
2220 self.transform_inner("<array element>", |this| this.transform(etype))
2221 }
2222 fn transform_map_key(&mut self, ktype: &'a DataType) -> DeltaResult<Cow<'a, DataType>> {
2223 self.transform_inner("<map key>", |this| this.transform(ktype))
2224 }
2225 fn transform_map_value(&mut self, vtype: &'a DataType) -> DeltaResult<Cow<'a, DataType>> {
2226 self.transform_inner("<map value>", |this| this.transform(vtype))
2227 }
2228 fn transform_struct_field(
2229 &mut self,
2230 field: &'a StructField,
2231 ) -> DeltaResult<Cow<'a, StructField>> {
2232 self.transform_inner(field.name(), |this| {
2233 let (physical_name, _id) = validate_and_extract_column_mapping_annotations(
2234 field,
2235 this.column_mapping_mode,
2236 &this.path,
2237 Some(&mut this.seen),
2238 )?;
2239
2240 if field.is_metadata_column() {
2241 return Ok(Cow::Borrowed(field));
2242 }
2243
2244 let field = this.recurse_into_struct_field(field)?;
2245
2246 let metadata = field.logical_to_physical_metadata(this.column_mapping_mode);
2247 let name = physical_name.to_owned();
2248
2249 Ok(Cow::Owned(field.with_name(name).with_metadata(metadata)))
2250 })
2251 }
2252
2253 fn transform_variant(&mut self, stype: &'a StructType) -> DeltaResult<Cow<'a, StructType>> {
2254 Ok(Cow::Borrowed(stype))
2257 }
2258}
2259
2260#[cfg(test)]
2261mod tests {
2262 use rstest::rstest;
2263 use serde_json;
2264
2265 use super::*;
2266 use crate::table_features::ColumnMappingMode;
2267 use crate::utils::test_utils::{
2268 assert_result_error_with_message, test_deep_nested_schema_missing_leaf_cm,
2269 };
2270
2271 fn example_schema_metadata() -> &'static str {
2272 r#"
2273 {
2274 "name": "e",
2275 "type": {
2276 "type": "array",
2277 "elementType": {
2278 "type": "struct",
2279 "fields": [
2280 {
2281 "name": "d",
2282 "type": "integer",
2283 "nullable": false,
2284 "metadata": {
2285 "delta.columnMapping.id": 5,
2286 "delta.columnMapping.physicalName": "col-a7f4159c-53be-4cb0-b81a-f7e5240cfc49"
2287 }
2288 }
2289 ]
2290 },
2291 "containsNull": true
2292 },
2293 "nullable": true,
2294 "metadata": {
2295 "delta.columnMapping.id": 4,
2296 "delta.columnMapping.physicalName": "col-5f422f40-de70-45b2-88ab-1d5c90e94db1",
2297 "delta.identity.start": 2147483648
2298 }
2299 }"#
2300 }
2301
2302 #[test]
2303 fn test_serde_data_types() {
2304 let data = r#"
2305 {
2306 "name": "a",
2307 "type": "integer",
2308 "nullable": false,
2309 "metadata": {}
2310 }
2311 "#;
2312 let field: StructField = serde_json::from_str(data).unwrap();
2313 assert!(matches!(field.data_type, DataType::INTEGER));
2314
2315 let data = r#"
2316 {
2317 "name": "c",
2318 "type": {
2319 "type": "array",
2320 "elementType": "integer",
2321 "containsNull": false
2322 },
2323 "nullable": true,
2324 "metadata": {}
2325 }
2326 "#;
2327 let field: StructField = serde_json::from_str(data).unwrap();
2328 assert!(matches!(field.data_type, DataType::Array(_)));
2329
2330 let data = r#"
2331 {
2332 "name": "e",
2333 "type": {
2334 "type": "array",
2335 "elementType": {
2336 "type": "struct",
2337 "fields": [
2338 {
2339 "name": "d",
2340 "type": "integer",
2341 "nullable": false,
2342 "metadata": {}
2343 }
2344 ]
2345 },
2346 "containsNull": true
2347 },
2348 "nullable": true,
2349 "metadata": {}
2350 }
2351 "#;
2352 let field: StructField = serde_json::from_str(data).unwrap();
2353 assert!(matches!(field.data_type, DataType::Array(_)));
2354 match field.data_type {
2355 DataType::Array(array) => assert!(matches!(array.element_type, DataType::Struct(_))),
2356 _ => unreachable!(),
2357 }
2358
2359 let data = r#"
2360 {
2361 "name": "f",
2362 "type": {
2363 "type": "map",
2364 "keyType": "string",
2365 "valueType": "string",
2366 "valueContainsNull": true
2367 },
2368 "nullable": true,
2369 "metadata": {}
2370 }
2371 "#;
2372 let field: StructField = serde_json::from_str(data).unwrap();
2373 assert!(matches!(field.data_type, DataType::Map(_)));
2374 }
2375
2376 #[test]
2377 fn test_roundtrip_decimal() {
2378 let data = r#"
2379 {
2380 "name": "a",
2381 "type": "decimal(10, 2)",
2382 "nullable": false,
2383 "metadata": {}
2384 }
2385 "#;
2386 let field: StructField = serde_json::from_str(data).unwrap();
2387 assert_eq!(field.data_type, DataType::decimal(10, 2).unwrap());
2388
2389 let json_str = serde_json::to_string(&field).unwrap();
2390 assert_eq!(
2391 json_str,
2392 r#"{"name":"a","type":"decimal(10,2)","nullable":false,"metadata":{}}"#
2393 );
2394 }
2395
2396 #[test]
2397 fn test_roundtrip_variant() {
2398 let data = r#"
2399 {
2400 "name": "v",
2401 "type": "variant",
2402 "nullable": false,
2403 "metadata": {}
2404 }
2405 "#;
2406 let field: StructField = serde_json::from_str(data).unwrap();
2407 assert_eq!(field.data_type, DataType::unshredded_variant());
2408
2409 let json_str = serde_json::to_string(&field).unwrap();
2410 assert_eq!(
2411 json_str,
2412 r#"{"name":"v","type":"variant","nullable":false,"metadata":{}}"#
2413 );
2414 }
2415
2416 #[test]
2417 fn test_unshredded_variant() {
2418 let unshredded_variant_type = DataType::unshredded_variant();
2419
2420 match &unshredded_variant_type {
2421 DataType::Variant(struct_type) => {
2422 let fields: Vec<_> = struct_type.fields().collect();
2423 assert_eq!(fields.len(), 2);
2424
2425 assert_eq!(fields[0].name, "metadata");
2426 assert_eq!(fields[0].data_type, DataType::BINARY);
2427 assert!(!fields[0].nullable);
2428
2429 assert_eq!(fields[1].name, "value");
2430 assert_eq!(fields[1].data_type, DataType::BINARY);
2431 assert!(!fields[1].nullable);
2432 }
2433 _ => panic!("Expected DataType::Variant, got {unshredded_variant_type:?}"),
2434 }
2435 }
2436
2437 #[rstest]
2438 #[case("interval second")]
2439 #[case("interval day")]
2440 #[case("money")]
2441 fn test_unsupported_type_error_message(#[case] unsupported_type: &str) {
2442 let data = format!(
2443 r#"{{
2444 "name": "test_field",
2445 "type": "{unsupported_type}",
2446 "nullable": false,
2447 "metadata": {{}}
2448 }}"#
2449 );
2450 let result: Result<StructField, _> = serde_json::from_str(&data);
2451 assert!(result.is_err());
2452 let err = result.unwrap_err();
2453 let expected_msg = format!("Unsupported Delta table type: '{unsupported_type}'");
2454 assert!(
2455 err.to_string().contains(&expected_msg),
2456 "Expected error message about unsupported type '{unsupported_type}', got: {err}"
2457 );
2458 }
2459
2460 #[rstest]
2461 #[case("string", DataType::STRING)]
2462 #[case("long", DataType::LONG)]
2463 #[case("integer", DataType::INTEGER)]
2464 #[case("short", DataType::SHORT)]
2465 #[case("byte", DataType::BYTE)]
2466 #[case("float", DataType::FLOAT)]
2467 #[case("double", DataType::DOUBLE)]
2468 #[case("boolean", DataType::BOOLEAN)]
2469 #[case("binary", DataType::BINARY)]
2470 #[case("date", DataType::DATE)]
2471 #[case("timestamp", DataType::TIMESTAMP)]
2472 #[case("timestamp_ntz", DataType::TIMESTAMP_NTZ)]
2473 fn test_primitive_type_deserialization_still_works(
2474 #[case] type_str: &str,
2475 #[case] expected_type: DataType,
2476 ) {
2477 let data = format!(
2478 r#"{{
2479 "name": "test_field",
2480 "type": "{type_str}",
2481 "nullable": false,
2482 "metadata": {{}}
2483 }}"#
2484 );
2485 let field: StructField = serde_json::from_str(&data).unwrap();
2486 assert_eq!(field.data_type, expected_type);
2487 }
2488
2489 #[rstest]
2490 #[case(10, 2)]
2491 #[case(16, 4)]
2492 #[case(38, 10)]
2493 fn test_decimal_with_primitive_deserializer(#[case] precision: u8, #[case] scale: u8) {
2494 let data = format!(
2495 r#"{{
2496 "name": "test_decimal",
2497 "type": "decimal({precision},{scale})",
2498 "nullable": false,
2499 "metadata": {{}}
2500 }}"#
2501 );
2502 let field: StructField = serde_json::from_str(&data).unwrap();
2503 assert_eq!(
2504 field.data_type,
2505 DataType::decimal(precision, scale).unwrap()
2506 );
2507 }
2508
2509 #[rstest]
2510 #[case("decimal(invalid)", "Invalid precision in decimal")]
2511 #[case("decimal(10)", "Invalid scale in decimal")]
2512 #[case("decimal()", "Invalid precision in decimal")]
2513 #[case("decimal(10,2,99)", "Invalid decimal format (expected 2 parts)")]
2514 fn test_invalid_decimal_format(#[case] invalid_type: &str, #[case] expected_error: &str) {
2515 let data = format!(
2516 r#"{{
2517 "name": "invalid",
2518 "type": "{invalid_type}",
2519 "nullable": false,
2520 "metadata": {{}}
2521 }}"#
2522 );
2523 let result: Result<StructField, _> = serde_json::from_str(&data);
2524 assert!(result.is_err());
2525 let err = result.unwrap_err();
2526 assert!(
2527 err.to_string().contains(expected_error),
2528 "Expected error containing '{expected_error}', got: {err}"
2529 );
2530 }
2531
2532 #[rstest]
2533 #[case(
2534 r#"{"type": "array", "elementType": "integer", "containsNull": false}"#,
2535 DataType::Array(Box::new(ArrayType::new(DataType::INTEGER, false)))
2536 )]
2537 #[case(
2538 r#"{"type": "struct", "fields": [{"name": "a", "type": "integer", "nullable": false, "metadata": {}}, {"name": "b", "type": "string", "nullable": true, "metadata": {}}]}"#,
2539 DataType::Struct(Box::new(StructType::new_unchecked([
2540 StructField::new("a", DataType::INTEGER, false),
2541 StructField::new("b", DataType::STRING, true),
2542 ])))
2543 )]
2544 #[case(
2545 r#"{"type": "map", "keyType": "string", "valueType": "integer", "valueContainsNull": true}"#,
2546 DataType::Map(Box::new(MapType::new(DataType::STRING, DataType::INTEGER, true)))
2547 )]
2548 #[case("\"string\"", DataType::STRING)]
2549 #[case("\"long\"", DataType::LONG)]
2550 #[case("\"integer\"", DataType::INTEGER)]
2551 #[case("\"short\"", DataType::SHORT)]
2552 #[case("\"byte\"", DataType::BYTE)]
2553 #[case("\"float\"", DataType::FLOAT)]
2554 #[case("\"double\"", DataType::DOUBLE)]
2555 #[case("\"boolean\"", DataType::BOOLEAN)]
2556 #[case("\"binary\"", DataType::BINARY)]
2557 #[case("\"date\"", DataType::DATE)]
2558 #[case("\"timestamp\"", DataType::TIMESTAMP)]
2559 #[case("\"timestamp_ntz\"", DataType::TIMESTAMP_NTZ)]
2560 #[case("\"variant\"", DataType::unshredded_variant())]
2561 fn test_data_type_deserialization(#[case] type_json: &str, #[case] expected: DataType) {
2562 let data_type: DataType = serde_json::from_str(type_json).unwrap();
2563 assert_eq!(data_type, expected);
2564 }
2565
2566 #[test]
2567 fn test_make_physical_no_column_mapping() {
2568 let field = StructField::nullable(
2569 "e",
2570 ArrayType::new(
2571 StructType::new_unchecked([StructField::not_null("d", DataType::INTEGER)]).into(),
2572 true,
2573 ),
2574 );
2575 let physical_field = field.make_physical(ColumnMappingMode::None).unwrap();
2576
2577 assert_eq!(physical_field.name, "e");
2578 assert!(physical_field
2579 .get_config_value(&ColumnMetadataKey::ColumnMappingId)
2580 .is_none());
2581 assert!(physical_field
2582 .get_config_value(&ColumnMetadataKey::ColumnMappingPhysicalName)
2583 .is_none());
2584
2585 let DataType::Array(atype) = physical_field.data_type else {
2586 panic!("Expected an Array");
2587 };
2588 let DataType::Struct(stype) = atype.element_type else {
2589 panic!("Expected a Struct");
2590 };
2591 let struct_field = stype.fields.get_index(0).unwrap().1;
2592 assert_eq!(struct_field.name, "d");
2593 }
2594
2595 #[test]
2596 fn test_make_physical_rejects_annotated_fields_when_column_mapping_disabled() {
2597 let data = example_schema_metadata();
2598 let field: StructField = serde_json::from_str(data).unwrap();
2599 assert!(field.make_physical(ColumnMappingMode::None).is_err());
2600 }
2601
2602 #[test]
2603 fn test_make_physical_rejects_unannotated_leaf_in_deep_nesting() {
2604 let schema = test_deep_nested_schema_missing_leaf_cm();
2605 let field = schema.fields().next().unwrap();
2606 let err = field
2607 .make_physical(ColumnMappingMode::Name)
2608 .unwrap_err()
2609 .to_string();
2610 assert!(
2611 err.contains("top.`<array element>`.mid_field.`<map value>`.leaf"),
2612 "Expected full nested path in error, got: {err}"
2613 );
2614 }
2615
2616 #[test]
2617 fn test_make_physical_rejects_duplicate_column_mapping_ids() {
2618 use crate::schema::ColumnMetadataKey;
2619
2620 fn cm_field(name: &str, id: i64, data_type: impl Into<DataType>) -> StructField {
2621 StructField::not_null(name, data_type).with_metadata([
2622 (
2623 ColumnMetadataKey::ColumnMappingId.as_ref(),
2624 MetadataValue::Number(id),
2625 ),
2626 (
2627 ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(),
2628 MetadataValue::String(format!("col-{name}")),
2629 ),
2630 ])
2631 }
2632
2633 let inner = StructType::new_unchecked([
2634 cm_field("x", 3, DataType::INTEGER),
2635 cm_field("y", 4, DataType::STRING),
2636 ]);
2637 let schema = StructType::new_unchecked([
2638 cm_field("a", 1, DataType::INTEGER),
2639 cm_field(
2640 "b",
2641 2,
2642 ArrayType::new(DataType::Struct(Box::new(inner)), true),
2643 ),
2644 cm_field("c", 3, DataType::STRING),
2645 ]);
2646 assert_result_error_with_message(
2647 schema.make_physical(ColumnMappingMode::Id),
2648 "Duplicate column mapping ID",
2649 );
2650 }
2651
2652 #[rstest]
2658 #[case::same_level("a", "a")]
2659 #[case::nested("a", "x")]
2660 fn test_make_physical_rejects_duplicate_physical_names(
2661 #[case] outer_name: &str,
2662 #[case] inner_name: &str,
2663 ) {
2664 use crate::schema::ColumnMetadataKey;
2665
2666 fn cm_field(
2671 name: &str,
2672 id: i64,
2673 physical: &str,
2674 data_type: impl Into<DataType>,
2675 ) -> StructField {
2676 StructField::not_null(name, data_type).with_metadata([
2677 (
2678 ColumnMetadataKey::ColumnMappingId.as_ref(),
2679 MetadataValue::Number(id),
2680 ),
2681 (
2682 ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(),
2683 MetadataValue::String(physical.to_string()),
2684 ),
2685 ])
2686 }
2687
2688 let schema = if outer_name == inner_name {
2689 StructType::new_unchecked([
2690 cm_field(outer_name, 1, "col-shared", DataType::INTEGER),
2691 cm_field("sibling", 2, "col-shared", DataType::STRING),
2692 ])
2693 } else {
2694 let inner = StructType::new_unchecked([cm_field(
2695 inner_name,
2696 2,
2697 "col-shared",
2698 DataType::INTEGER,
2699 )]);
2700 StructType::new_unchecked([
2701 cm_field(outer_name, 1, "col-shared", DataType::INTEGER),
2702 cm_field(
2703 "nested_holder",
2704 3,
2705 "col-nested-holder",
2706 DataType::Struct(Box::new(inner)),
2707 ),
2708 ])
2709 };
2710 assert_result_error_with_message(
2711 schema.make_physical(ColumnMappingMode::Name),
2712 "Duplicate `delta.columnMapping.physicalName` 'col-shared'",
2713 );
2714 }
2715
2716 #[test]
2717 fn test_make_physical_column_mapping() {
2718 [ColumnMappingMode::Name, ColumnMappingMode::Id]
2719 .into_iter()
2720 .for_each(|mode| {
2721 let data = example_schema_metadata();
2722
2723 let field: StructField = serde_json::from_str(data).unwrap();
2724
2725 let col_id = field
2726 .get_config_value(&ColumnMetadataKey::ColumnMappingId)
2727 .unwrap();
2728 let id_start = field
2729 .get_config_value(&ColumnMetadataKey::IdentityStart)
2730 .unwrap();
2731 assert!(matches!(col_id, MetadataValue::Number(num) if *num == 4));
2732 assert!(matches!(id_start, MetadataValue::Number(num) if *num == 2147483648i64));
2733 assert_eq!(
2734 field.physical_name(mode),
2735 "col-5f422f40-de70-45b2-88ab-1d5c90e94db1"
2736 );
2737 let physical_field = field.make_physical(mode).unwrap();
2738
2739 match mode {
2741 ColumnMappingMode::Id => {
2742 assert!(matches!(
2743 physical_field.get_config_value(&ColumnMetadataKey::ParquetFieldId),
2744 Some(MetadataValue::Number(4))
2745 ));
2746
2747 assert!(matches!(
2748 physical_field.get_config_value(&ColumnMetadataKey::ColumnMappingId),
2749 Some(MetadataValue::Number(4))
2750 ));
2751 }
2752 ColumnMappingMode::Name => {
2753 assert!(matches!(
2754 physical_field.get_config_value(&ColumnMetadataKey::ParquetFieldId),
2755 Some(MetadataValue::Number(4))
2756 ));
2757 assert!(matches!(
2758 physical_field.get_config_value(&ColumnMetadataKey::ColumnMappingId),
2759 Some(MetadataValue::Number(4))
2760 ));
2761 }
2762 ColumnMappingMode::None => panic!("unexpected column mapping mode"),
2763 }
2764
2765 assert_eq!(
2766 physical_field.name,
2767 "col-5f422f40-de70-45b2-88ab-1d5c90e94db1"
2768 );
2769 let DataType::Array(atype) = physical_field.data_type else {
2770 panic!("Expected an Array");
2771 };
2772 let DataType::Struct(stype) = atype.element_type else {
2773 panic!("Expected a Struct");
2774 };
2775
2776 let struct_field = stype.fields.get_index(0).unwrap().1;
2777 assert_eq!(
2778 struct_field.name,
2779 "col-a7f4159c-53be-4cb0-b81a-f7e5240cfc49"
2780 );
2781
2782 match mode {
2784 ColumnMappingMode::Id => {
2785 assert!(matches!(
2786 struct_field.get_config_value(&ColumnMetadataKey::ParquetFieldId),
2787 Some(MetadataValue::Number(5))
2788 ));
2789 assert!(matches!(
2790 struct_field.get_config_value(&ColumnMetadataKey::ColumnMappingId),
2791 Some(MetadataValue::Number(5))
2792 ));
2793 }
2794 ColumnMappingMode::Name => {
2795 assert!(matches!(
2796 struct_field.get_config_value(&ColumnMetadataKey::ParquetFieldId),
2797 Some(MetadataValue::Number(5))
2798 ));
2799 assert!(matches!(
2800 struct_field.get_config_value(&ColumnMetadataKey::ColumnMappingId),
2801 Some(MetadataValue::Number(5))
2802 ));
2803 }
2804 ColumnMappingMode::None => panic!("unexpected column mapping mode"),
2805 }
2806 });
2807 }
2808
2809 #[test]
2810 fn test_make_physical_passes_metadata_column_through() {
2811 let field = StructField::create_metadata_column(
2812 "_metadata.row_index",
2813 MetadataColumnSpec::RowIndex,
2814 );
2815 for mode in [
2816 ColumnMappingMode::None,
2817 ColumnMappingMode::Name,
2818 ColumnMappingMode::Id,
2819 ] {
2820 let physical = field.make_physical(mode).unwrap();
2821 assert_eq!(physical.name(), "_metadata.row_index");
2822 assert!(physical.is_metadata_column());
2823 }
2824 }
2825
2826 #[test]
2827 fn test_make_physical_rejects_metadata_column_with_cm_annotations() {
2828 let field = StructField::create_metadata_column(
2829 "_metadata.row_index",
2830 MetadataColumnSpec::RowIndex,
2831 )
2832 .add_metadata([(
2833 ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(),
2834 MetadataValue::String("phys".to_string()),
2835 )]);
2836 assert_result_error_with_message(
2837 field.make_physical(ColumnMappingMode::Name),
2838 "must not have column mapping annotations",
2839 );
2840 }
2841
2842 #[test]
2843 fn test_read_schemas() {
2844 let file = std::fs::File::open("./tests/serde/schema.json").unwrap();
2845 let schema: Result<Schema, _> = serde_json::from_reader(file);
2846 assert!(schema.is_ok());
2847
2848 let file = std::fs::File::open("./tests/serde/checkpoint_schema.json").unwrap();
2849 let schema: Result<Schema, _> = serde_json::from_reader(file);
2850 assert!(schema.is_ok())
2851 }
2852
2853 #[test]
2854 fn test_invalid_decimal() {
2855 let data = r#"
2856 {
2857 "name": "a",
2858 "type": "decimal(39, 10)",
2859 "nullable": false,
2860 "metadata": {}
2861 }
2862 "#;
2863 assert!(serde_json::from_str::<StructField>(data).is_err());
2864
2865 let data = r#"
2866 {
2867 "name": "a",
2868 "type": "decimal(10, 39)",
2869 "nullable": false,
2870 "metadata": {}
2871 }
2872 "#;
2873 assert!(serde_json::from_str::<StructField>(data).is_err());
2874 }
2875
2876 #[test]
2877 fn test_metadata_value_to_string() {
2878 assert_eq!(MetadataValue::Number(0).to_string(), "0");
2879 assert_eq!(
2880 MetadataValue::String("hello".to_string()).to_string(),
2881 "hello"
2882 );
2883 assert_eq!(MetadataValue::Boolean(true).to_string(), "true");
2884 assert_eq!(MetadataValue::Boolean(false).to_string(), "false");
2885 let object_json = serde_json::json!({ "an": "object" });
2886 assert_eq!(
2887 MetadataValue::Other(object_json).to_string(),
2888 "{\"an\":\"object\"}"
2889 );
2890 let array_json = serde_json::json!(["an", "array"]);
2891 assert_eq!(
2892 MetadataValue::Other(array_json).to_string(),
2893 "[\"an\",\"array\"]"
2894 );
2895 }
2896
2897 #[test]
2898 fn test_num_fields() {
2899 let schema = StructType::new_unchecked([]);
2900 assert!(schema.num_fields() == 0);
2901 let schema = StructType::new_unchecked([
2902 StructField::nullable("a", DataType::LONG),
2903 StructField::nullable("b", DataType::LONG),
2904 StructField::nullable("c", DataType::LONG),
2905 StructField::nullable("d", DataType::LONG),
2906 ]);
2907 assert_eq!(schema.num_fields(), 4);
2908 let schema = StructType::new_unchecked([
2909 StructField::nullable("b", DataType::LONG),
2910 StructField::not_null("b", DataType::LONG),
2911 StructField::nullable("c", DataType::LONG),
2912 StructField::nullable("c", DataType::LONG),
2913 ]);
2914 assert_eq!(schema.num_fields(), 2);
2915 }
2916
2917 #[test]
2918 fn test_has_invariants() {
2919 let schema = StructType::new_unchecked([
2921 StructField::nullable("a", DataType::STRING),
2922 StructField::nullable("b", DataType::INTEGER),
2923 ]);
2924 assert!(!schema_has_invariants(&schema));
2925
2926 let mut field = StructField::nullable("c", DataType::STRING);
2928 field.metadata.insert(
2929 ColumnMetadataKey::Invariants.as_ref().to_string(),
2930 MetadataValue::String("c > 0".to_string()),
2931 );
2932
2933 let schema =
2934 StructType::new_unchecked([StructField::nullable("a", DataType::STRING), field]);
2935 assert!(schema_has_invariants(&schema));
2936
2937 let nested_field = StructField::nullable(
2939 "nested_c",
2940 DataType::try_struct_type([{
2941 let mut field = StructField::nullable("d", DataType::INTEGER);
2942 field.metadata.insert(
2943 ColumnMetadataKey::Invariants.as_ref().to_string(),
2944 MetadataValue::String("d > 0".to_string()),
2945 );
2946 field
2947 }])
2948 .unwrap(),
2949 );
2950
2951 let schema = StructType::new_unchecked([
2952 StructField::nullable("a", DataType::STRING),
2953 StructField::nullable("b", DataType::INTEGER),
2954 nested_field,
2955 ]);
2956 assert!(schema_has_invariants(&schema));
2957
2958 let array_field = StructField::nullable(
2960 "array_field",
2961 ArrayType::new(
2962 DataType::try_struct_type([{
2963 let mut field = StructField::nullable("d", DataType::INTEGER);
2964 field.metadata.insert(
2965 ColumnMetadataKey::Invariants.as_ref().to_string(),
2966 MetadataValue::String("d > 0".to_string()),
2967 );
2968 field
2969 }])
2970 .unwrap(),
2971 true,
2972 ),
2973 );
2974
2975 let schema = StructType::new_unchecked([
2976 StructField::nullable("a", DataType::STRING),
2977 StructField::nullable("b", DataType::INTEGER),
2978 array_field,
2979 ]);
2980 assert!(schema_has_invariants(&schema));
2981
2982 let map_field = StructField::nullable(
2984 "map_field",
2985 MapType::new(
2986 DataType::STRING,
2987 DataType::try_struct_type([{
2988 let mut field = StructField::nullable("d", DataType::INTEGER);
2989 field.metadata.insert(
2990 ColumnMetadataKey::Invariants.as_ref().to_string(),
2991 MetadataValue::String("d > 0".to_string()),
2992 );
2993 field
2994 }])
2995 .unwrap(),
2996 true,
2997 ),
2998 );
2999
3000 let schema = StructType::new_unchecked([
3001 StructField::nullable("a", DataType::STRING),
3002 StructField::nullable("b", DataType::INTEGER),
3003 map_field,
3004 ]);
3005 assert!(schema_has_invariants(&schema));
3006 }
3007
3008 fn all_nullable_schema() -> StructType {
3009 StructType::new_unchecked([
3010 StructField::nullable("a", DataType::STRING),
3011 StructField::nullable("b", DataType::INTEGER),
3012 ])
3013 }
3014
3015 fn top_level_non_null_schema() -> StructType {
3016 StructType::new_unchecked([
3017 StructField::not_null("id", DataType::INTEGER),
3018 StructField::nullable("name", DataType::STRING),
3019 ])
3020 }
3021
3022 fn nested_non_null_schema() -> StructType {
3023 let nested_field = StructField::nullable(
3024 "parent",
3025 DataType::try_struct_type([StructField::not_null("child", DataType::INTEGER)]).unwrap(),
3026 );
3027 StructType::new_unchecked([StructField::nullable("a", DataType::STRING), nested_field])
3028 }
3029
3030 fn array_non_null_schema() -> StructType {
3031 let array_field = StructField::nullable(
3032 "arr",
3033 ArrayType::new(
3034 DataType::try_struct_type([StructField::not_null("child", DataType::INTEGER)])
3035 .unwrap(),
3036 true,
3037 ),
3038 );
3039 StructType::new_unchecked([array_field])
3040 }
3041
3042 fn map_non_null_schema() -> StructType {
3043 let map_field = StructField::nullable(
3044 "map",
3045 MapType::new(
3046 DataType::STRING,
3047 DataType::try_struct_type([StructField::not_null("child", DataType::INTEGER)])
3048 .unwrap(),
3049 true,
3050 ),
3051 );
3052 StructType::new_unchecked([map_field])
3053 }
3054
3055 fn variant_only_schema() -> StructType {
3056 StructType::new_unchecked([StructField::nullable("v", DataType::unshredded_variant())])
3059 }
3060
3061 #[rstest]
3062 #[case::all_nullable(all_nullable_schema(), false)]
3063 #[case::top_level(top_level_non_null_schema(), true)]
3064 #[case::nested_struct(nested_non_null_schema(), true)]
3065 #[case::array_element(array_non_null_schema(), true)]
3066 #[case::map_value(map_non_null_schema(), true)]
3067 #[case::variant_skipped(variant_only_schema(), false)]
3068 fn test_schema_contains_non_null_fields(#[case] schema: StructType, #[case] expected: bool) {
3069 assert_eq!(schema_contains_non_null_fields(&schema), expected);
3070 }
3071
3072 #[test]
3073 fn test_struct_type_iterator_basic() {
3074 let fields = vec![
3075 StructField::new("field1", DataType::STRING, true),
3076 StructField::new("field2", DataType::INTEGER, false),
3077 StructField::new("field3", DataType::BOOLEAN, true),
3078 ];
3079 let struct_type = StructType::new_unchecked(fields.clone());
3080
3081 let field_names: Vec<_> = struct_type.fields().map(|f| f.name()).collect();
3083 assert_eq!(field_names, vec!["field1", "field2", "field3"]);
3084
3085 assert_eq!(struct_type.field("field1").unwrap().name, "field1");
3087 }
3088
3089 #[test]
3090 fn test_struct_type_into_iterator_owned() {
3091 let fields = vec![
3092 StructField::new("a", DataType::STRING, true),
3093 StructField::new("b", DataType::INTEGER, false),
3094 ];
3095 let struct_type = StructType::new_unchecked(fields);
3096
3097 let mut field_names = Vec::new();
3099 for field in struct_type {
3100 field_names.push(field.name);
3101 }
3102 assert_eq!(field_names, vec!["a", "b"]);
3103 }
3104
3105 #[test]
3106 fn test_struct_type_into_iterator_references() {
3107 let fields = vec![
3108 StructField::new("x", DataType::DOUBLE, true),
3109 StructField::new("y", DataType::FLOAT, false),
3110 StructField::new("z", DataType::LONG, true),
3111 ];
3112 let struct_type = StructType::new_unchecked(fields);
3113
3114 let mut field_names = Vec::new();
3116 for field in &struct_type {
3117 field_names.push(field.name().clone());
3118 }
3119 assert_eq!(field_names, vec!["x", "y", "z"]);
3120
3121 assert_eq!(struct_type.field("x").unwrap().name, "x");
3123 }
3124
3125 #[test]
3126 fn test_iterator_exact_size() {
3127 let fields = vec![
3128 StructField::new("field1", DataType::STRING, true),
3129 StructField::new("field2", DataType::INTEGER, false),
3130 StructField::new("field3", DataType::BOOLEAN, true),
3131 StructField::new("field4", DataType::DATE, true),
3132 ];
3133
3134 let struct_type = StructType::new_unchecked(fields.clone());
3136 let ref_iter = struct_type.fields();
3137 assert_eq!(ref_iter.len(), 4);
3138
3139 let struct_type = StructType::new_unchecked(fields.clone());
3141 let into_ref_iter = (&struct_type).into_iter();
3142 assert_eq!(into_ref_iter.len(), 4);
3143
3144 let struct_type = StructType::new_unchecked(fields);
3146 let into_owned_iter = struct_type.into_iter();
3147 assert_eq!(into_owned_iter.len(), 4);
3148 }
3149
3150 #[test]
3151 fn test_iterator_with_metadata() {
3152 let field_with_metadata = StructField::new("test_field", DataType::STRING, true)
3153 .with_metadata([("key1", MetadataValue::String("value1".to_string()))]);
3154
3155 let struct_type = StructType::new_unchecked([field_with_metadata]);
3156
3157 for field in &struct_type {
3159 assert_eq!(field.metadata().len(), 1);
3160 assert_eq!(
3161 field.metadata().get("key1"),
3162 Some(&MetadataValue::String("value1".to_string()))
3163 );
3164 }
3165
3166 for field in struct_type {
3168 assert_eq!(field.metadata().len(), 1);
3169 assert_eq!(
3170 field.metadata().get("key1"),
3171 Some(&MetadataValue::String("value1".to_string()))
3172 );
3173 }
3174 }
3175
3176 #[test]
3177 fn test_empty_struct_type_iterator() {
3178 let struct_type = StructType::new_unchecked(std::iter::empty::<StructField>());
3179
3180 assert_eq!(struct_type.fields().count(), 0);
3182 assert_eq!((&struct_type).into_iter().count(), 0);
3183 assert_eq!(struct_type.into_iter().count(), 0);
3184 }
3185
3186 #[test]
3187 fn test_iterator_order_preservation() {
3188 let fields = vec![
3189 StructField::new("zebra", DataType::STRING, true),
3190 StructField::new("apple", DataType::INTEGER, false),
3191 StructField::new("banana", DataType::BOOLEAN, true),
3192 ];
3193 let struct_type = StructType::new_unchecked(fields);
3194
3195 let field_names: Vec<_> = struct_type.fields().map(|f| f.name()).collect();
3197 assert_eq!(field_names, vec!["zebra", "apple", "banana"]);
3198
3199 let ref_names: Vec<_> = (&struct_type).into_iter().map(|f| f.name()).collect();
3201 assert_eq!(ref_names, vec!["zebra", "apple", "banana"]);
3202
3203 let owned_names: Vec<_> = struct_type.into_iter().map(|f| f.name).collect();
3205 assert_eq!(owned_names, vec!["zebra", "apple", "banana"]);
3206 }
3207
3208 #[test]
3209 fn test_iterator_collect() {
3210 let original_fields = vec![
3211 StructField::new("field1", DataType::STRING, true),
3212 StructField::new("field2", DataType::INTEGER, false),
3213 ];
3214 let struct_type = StructType::new_unchecked(original_fields.clone());
3215
3216 let collected_refs: Vec<&StructField> = struct_type.fields().collect();
3218 assert_eq!(collected_refs.len(), 2);
3219 assert_eq!(collected_refs[0].name, "field1");
3220 assert_eq!(collected_refs[1].name, "field2");
3221
3222 let collected_owned: Vec<StructField> = struct_type.into_iter().collect();
3224 assert_eq!(collected_owned.len(), 2);
3225 assert_eq!(collected_owned[0].name, "field1");
3226 assert_eq!(collected_owned[1].name, "field2");
3227 }
3228
3229 #[test]
3230 fn test_iterator_functional_methods() {
3231 let fields = vec![
3232 StructField::new("nullable_string", DataType::STRING, true),
3233 StructField::new("required_int", DataType::INTEGER, false),
3234 StructField::new("nullable_bool", DataType::BOOLEAN, true),
3235 StructField::new("required_long", DataType::LONG, false),
3236 ];
3237 let struct_type = StructType::new_unchecked(fields);
3238
3239 let nullable_count = struct_type.fields().filter(|f| f.is_nullable()).count();
3241 assert_eq!(nullable_count, 2);
3242
3243 let required_field_names: Vec<_> = struct_type
3245 .fields()
3246 .filter(|f| !f.is_nullable())
3247 .map(|f| f.name())
3248 .collect();
3249 assert_eq!(required_field_names, vec!["required_int", "required_long"]);
3250
3251 for (index, field) in struct_type.fields().enumerate() {
3253 match index {
3254 0 => assert_eq!(field.name, "nullable_string"),
3255 1 => assert_eq!(field.name, "required_int"),
3256 2 => assert_eq!(field.name, "nullable_bool"),
3257 3 => assert_eq!(field.name, "required_long"),
3258 _ => panic!("Unexpected field index: {index}"),
3259 }
3260 }
3261 }
3262
3263 #[test]
3264 fn test_double_ended_iterator_ref() {
3265 let fields = vec![
3266 StructField::new("first", DataType::STRING, true),
3267 StructField::new("second", DataType::INTEGER, false),
3268 StructField::new("third", DataType::BOOLEAN, true),
3269 StructField::new("fourth", DataType::LONG, false),
3270 ];
3271 let struct_type = StructType::new_unchecked(fields);
3272
3273 let mut iter = struct_type.fields();
3275
3276 assert_eq!(iter.next().unwrap().name, "first");
3278 assert_eq!(iter.next().unwrap().name, "second");
3279
3280 assert_eq!(iter.next_back().unwrap().name, "fourth");
3282 assert_eq!(iter.next_back().unwrap().name, "third");
3283
3284 assert!(iter.next().is_none());
3286 assert!(iter.next_back().is_none());
3287 }
3288
3289 #[test]
3290 fn test_double_ended_iterator_owned() {
3291 let fields = vec![
3292 StructField::new("alpha", DataType::STRING, true),
3293 StructField::new("beta", DataType::INTEGER, false),
3294 StructField::new("gamma", DataType::BOOLEAN, true),
3295 ];
3296 let struct_type = StructType::new_unchecked(fields);
3297
3298 let mut iter = struct_type.into_iter();
3300
3301 assert_eq!(iter.next_back().unwrap().name, "gamma");
3303
3304 assert_eq!(iter.next().unwrap().name, "alpha");
3306
3307 assert_eq!(iter.next_back().unwrap().name, "beta");
3309
3310 assert!(iter.next().is_none());
3312 assert!(iter.next_back().is_none());
3313 }
3314
3315 #[test]
3316 fn test_double_ended_iterator_collect_reverse() {
3317 let fields = vec![
3318 StructField::new("one", DataType::STRING, true),
3319 StructField::new("two", DataType::INTEGER, false),
3320 StructField::new("three", DataType::BOOLEAN, true),
3321 ];
3322 let struct_type = StructType::new_unchecked(fields);
3323
3324 let reversed_names: Vec<_> = struct_type.fields().rev().map(|f| f.name()).collect();
3326 assert_eq!(reversed_names, vec!["three", "two", "one"]);
3327
3328 assert_eq!(struct_type.field("two").unwrap().name, "two");
3330 }
3331
3332 #[test]
3333 fn test_double_ended_iterator_with_into_iter_ref() {
3334 let fields = vec![
3335 StructField::new("x", DataType::DOUBLE, true),
3336 StructField::new("y", DataType::FLOAT, false),
3337 StructField::new("z", DataType::LONG, true),
3338 ];
3339 let struct_type = StructType::new_unchecked(fields);
3340
3341 let mut iter = (&struct_type).into_iter();
3343
3344 assert_eq!(iter.next().unwrap().name, "x");
3346 assert_eq!(iter.next_back().unwrap().name, "z");
3347 assert_eq!(iter.next().unwrap().name, "y");
3348
3349 assert!(iter.next().is_none());
3351 assert!(iter.next_back().is_none());
3352 }
3353
3354 #[test]
3355 fn test_fused_iterator_ref() {
3356 let fields = vec![
3357 StructField::new("test1", DataType::STRING, true),
3358 StructField::new("test2", DataType::INTEGER, false),
3359 ];
3360 let struct_type = StructType::new_unchecked(fields);
3361
3362 let mut iter = struct_type.fields();
3364
3365 assert!(iter.next().is_some());
3367 assert!(iter.next().is_some());
3368 assert!(iter.next().is_none());
3369
3370 assert!(iter.next().is_none());
3372 assert!(iter.next().is_none());
3373 assert!(iter.next().is_none());
3374 }
3375
3376 #[test]
3377 fn test_fused_iterator_owned() {
3378 let fields = vec![
3379 StructField::new("item1", DataType::STRING, true),
3380 StructField::new("item2", DataType::INTEGER, false),
3381 ];
3382 let struct_type = StructType::new_unchecked(fields);
3383
3384 let mut iter = struct_type.into_iter();
3386
3387 assert!(iter.next().is_some());
3389 assert!(iter.next().is_some());
3390 assert!(iter.next().is_none());
3391
3392 assert!(iter.next().is_none());
3394 assert!(iter.next().is_none());
3395 assert!(iter.next().is_none());
3396 }
3397
3398 #[test]
3399 fn test_fused_iterator_with_into_iter_ref() {
3400 let fields = vec![StructField::new("field_a", DataType::BOOLEAN, true)];
3401 let struct_type = StructType::new_unchecked(fields);
3402
3403 let mut iter = (&struct_type).into_iter();
3405
3406 assert!(iter.next().is_some());
3408 assert!(iter.next().is_none());
3409
3410 assert!(iter.next().is_none());
3412 assert!(iter.next().is_none());
3413 }
3414
3415 #[test]
3416 fn test_fused_double_ended_iterator_empty() {
3417 let struct_type = StructType::new_unchecked(std::iter::empty::<StructField>());
3418
3419 let mut iter = struct_type.fields();
3421
3422 assert!(iter.next().is_none());
3424 assert!(iter.next_back().is_none());
3425
3426 assert!(iter.next().is_none());
3428 assert!(iter.next_back().is_none());
3429 }
3430
3431 #[test]
3432 fn test_double_ended_iterator_single_element() {
3433 let fields = vec![StructField::new("single", DataType::STRING, true)];
3434 let struct_type = StructType::new_unchecked(fields);
3435
3436 let mut iter = struct_type.fields();
3438
3439 assert_eq!(iter.next().unwrap().name, "single");
3441 assert!(iter.next().is_none());
3442 assert!(iter.next_back().is_none());
3443
3444 let struct_type =
3446 StructType::new_unchecked([StructField::new("single2", DataType::INTEGER, false)]);
3447 let mut iter = struct_type.into_iter();
3448
3449 assert_eq!(iter.next_back().unwrap().name, "single2");
3450 assert!(iter.next().is_none());
3451 assert!(iter.next_back().is_none());
3452 }
3453
3454 #[test]
3455 fn test_metadata_column_spec() -> DeltaResult<()> {
3456 assert_eq!(MetadataColumnSpec::RowIndex.text_value(), "row_index");
3458 assert_eq!(MetadataColumnSpec::RowId.text_value(), "row_id");
3459 assert_eq!(
3460 MetadataColumnSpec::RowCommitVersion.text_value(),
3461 "row_commit_version"
3462 );
3463 assert_eq!(MetadataColumnSpec::FilePath.text_value(), "_file");
3464
3465 assert_eq!(MetadataColumnSpec::RowIndex.data_type(), DataType::LONG);
3467 assert_eq!(MetadataColumnSpec::RowId.data_type(), DataType::LONG);
3468 assert_eq!(
3469 MetadataColumnSpec::RowCommitVersion.data_type(),
3470 DataType::LONG
3471 );
3472 assert_eq!(MetadataColumnSpec::FilePath.data_type(), DataType::STRING);
3473
3474 assert!(!MetadataColumnSpec::RowIndex.nullable());
3476 assert!(!MetadataColumnSpec::RowId.nullable());
3477 assert!(!MetadataColumnSpec::RowCommitVersion.nullable());
3478 assert!(!MetadataColumnSpec::FilePath.nullable());
3479
3480 assert_eq!(MetadataColumnSpec::RowIndex.reserved_field_id(), None);
3482 assert_eq!(MetadataColumnSpec::RowId.reserved_field_id(), None);
3483 assert_eq!(
3484 MetadataColumnSpec::RowCommitVersion.reserved_field_id(),
3485 None
3486 );
3487 assert_eq!(
3488 MetadataColumnSpec::FilePath.reserved_field_id(),
3489 Some(crate::reserved_field_ids::FILE_NAME)
3490 );
3491
3492 assert_eq!(
3494 MetadataColumnSpec::from_str("row_index")?,
3495 MetadataColumnSpec::RowIndex
3496 );
3497 assert_eq!(
3498 MetadataColumnSpec::from_str("row_id")?,
3499 MetadataColumnSpec::RowId
3500 );
3501 assert_eq!(
3502 MetadataColumnSpec::from_str("row_commit_version")?,
3503 MetadataColumnSpec::RowCommitVersion
3504 );
3505 assert_eq!(
3506 MetadataColumnSpec::from_str("_file")?,
3507 MetadataColumnSpec::FilePath
3508 );
3509
3510 assert!(MetadataColumnSpec::from_str("invalid").is_err());
3512
3513 Ok(())
3514 }
3515
3516 #[test]
3517 fn test_create_metadata_column() {
3518 let field =
3519 StructField::create_metadata_column("test_row_index", MetadataColumnSpec::RowIndex);
3520
3521 assert_eq!(field.name(), "test_row_index");
3522 assert_eq!(field.data_type(), &DataType::LONG);
3523 assert!(!field.nullable);
3524 assert!(field.is_metadata_column());
3525 assert_eq!(
3526 field.get_metadata_column_spec(),
3527 Some(MetadataColumnSpec::RowIndex)
3528 );
3529 }
3530
3531 #[test]
3532 fn test_default_row_index_column() {
3533 let field = StructField::default_row_index_column();
3534
3535 assert_eq!(field.name(), "_metadata.row_index");
3536 assert_eq!(field.data_type(), &DataType::LONG);
3537 assert!(!field.nullable);
3538 assert!(field.is_metadata_column());
3539 assert_eq!(
3540 field.get_metadata_column_spec(),
3541 Some(MetadataColumnSpec::RowIndex)
3542 );
3543 }
3544
3545 #[test]
3546 fn test_add_column() -> DeltaResult<()> {
3547 let schema = StructType::try_new([StructField::nullable("col1", DataType::STRING)])?;
3548
3549 let new_field = StructField::nullable("col2", DataType::INTEGER);
3550 let updated_schema = schema.add([new_field])?;
3551
3552 assert_eq!(updated_schema.fields().count(), 2);
3553 assert!(updated_schema.contains("col1"));
3554 assert!(updated_schema.contains("col2"));
3555 Ok(())
3556 }
3557
3558 #[test]
3559 fn test_add_metadata_column() -> DeltaResult<()> {
3560 let schema = StructType::try_new([StructField::nullable("regular_col", DataType::STRING)])?;
3561
3562 let schema_with_metadata =
3563 schema.add_metadata_column("my_row_index", MetadataColumnSpec::RowIndex)?;
3564
3565 assert_eq!(schema_with_metadata.fields().count(), 2);
3566 assert!(schema_with_metadata.contains_metadata_column(&MetadataColumnSpec::RowIndex));
3567 assert!(schema_with_metadata.contains("my_row_index"));
3568 assert_eq!(
3569 schema_with_metadata.index_of_metadata_column(&MetadataColumnSpec::RowIndex),
3570 Some(&1)
3571 );
3572 Ok(())
3573 }
3574
3575 #[test]
3576 fn test_duplicate_metadata_columns() -> DeltaResult<()> {
3577 let schema = StructType::try_new([StructField::nullable("regular_col", DataType::STRING)])?;
3578
3579 let schema_with_metadata =
3580 schema.add_metadata_column("row_index1", MetadataColumnSpec::RowIndex)?;
3581
3582 let result =
3584 schema_with_metadata.add_metadata_column("row_index2", MetadataColumnSpec::RowIndex);
3585
3586 assert_result_error_with_message(result, "Duplicate metadata column");
3587 Ok(())
3588 }
3589
3590 #[test]
3591 fn test_duplicate_field_name_case_insensitive() {
3592 let result = StructType::try_new([
3594 StructField::nullable("Value", DataType::INTEGER),
3595 StructField::nullable("value", DataType::STRING),
3596 ]);
3597 assert_result_error_with_message(result, "Duplicate field name (case-insensitive)");
3598 }
3599
3600 #[test]
3601 fn test_duplicate_field_name_exact() {
3602 let result = StructType::try_new([
3604 StructField::nullable("id", DataType::INTEGER),
3605 StructField::nullable("id", DataType::STRING),
3606 ]);
3607 assert_result_error_with_message(result, "Duplicate field name (case-insensitive)");
3608 }
3609
3610 #[test]
3611 fn test_nested_metadata_columns_validation_struct() -> DeltaResult<()> {
3612 let nested_field_with_metadata =
3614 StructField::create_metadata_column("nested_row_index", MetadataColumnSpec::RowIndex);
3615 let nested_struct = StructType {
3616 type_name: "struct".into(),
3617 fields: [(
3618 nested_field_with_metadata.name.clone(),
3619 nested_field_with_metadata,
3620 )]
3621 .into_iter()
3622 .collect(),
3623 metadata_columns: HashMap::new(),
3624 };
3625
3626 let result = StructType::try_new([
3627 StructField::nullable("regular_col", DataType::STRING),
3628 StructField::nullable("nested", DataType::Struct(Box::new(nested_struct))),
3629 ]);
3630
3631 assert_result_error_with_message(result, "only allowed at the top level");
3632 Ok(())
3633 }
3634
3635 #[test]
3636 fn test_nested_metadata_columns_validation_array() -> DeltaResult<()> {
3637 let nested_field_with_metadata =
3639 StructField::create_metadata_column("nested_row_index", MetadataColumnSpec::RowIndex);
3640 let nested_struct = StructType {
3641 type_name: "struct".into(),
3642 fields: [(
3643 nested_field_with_metadata.name.clone(),
3644 nested_field_with_metadata,
3645 )]
3646 .into_iter()
3647 .collect(),
3648 metadata_columns: HashMap::new(),
3649 };
3650 let array_type = ArrayType::new(DataType::Struct(Box::new(nested_struct)), true);
3651
3652 let result = StructType::try_new([
3653 StructField::nullable("regular_col", DataType::STRING),
3654 StructField::nullable("array_col", DataType::Array(Box::new(array_type))),
3655 ]);
3656
3657 assert_result_error_with_message(result, "only allowed at the top level");
3658 Ok(())
3659 }
3660
3661 #[test]
3662 fn test_nested_metadata_columns_validation_map() -> DeltaResult<()> {
3663 let nested_field_with_metadata =
3665 StructField::create_metadata_column("nested_row_index", MetadataColumnSpec::RowIndex);
3666 let nested_struct = StructType {
3667 type_name: "struct".into(),
3668 fields: [(
3669 nested_field_with_metadata.name.clone(),
3670 nested_field_with_metadata,
3671 )]
3672 .into_iter()
3673 .collect(),
3674 metadata_columns: HashMap::new(),
3675 };
3676
3677 for map_type in [
3678 MapType::new(
3679 DataType::Struct(Box::new(nested_struct.clone())),
3680 DataType::STRING,
3681 true,
3682 ),
3683 MapType::new(
3684 DataType::STRING,
3685 DataType::Struct(Box::new(nested_struct)),
3686 true,
3687 ),
3688 ] {
3689 let result = StructType::try_new([
3690 StructField::nullable("regular_col", DataType::STRING),
3691 StructField::nullable("map_col", DataType::Map(Box::new(map_type))),
3692 ]);
3693
3694 assert_result_error_with_message(result, "only allowed at the top level");
3695 }
3696
3697 Ok(())
3698 }
3699
3700 #[test]
3701 fn test_column_identifier_trait() -> DeltaResult<()> {
3702 let schema = StructType::try_new([
3703 StructField::nullable("regular_col", DataType::STRING),
3704 StructField::create_metadata_column("row_index_col", MetadataColumnSpec::RowIndex),
3705 ])?;
3706
3707 assert!(schema.contains("regular_col"));
3709 assert!(schema.contains("row_index_col"));
3710 assert!(!schema.contains("nonexistent"));
3711
3712 assert!(schema.contains("regular_col"));
3714 assert!(schema.contains("row_index_col"));
3715
3716 assert!(schema.contains_metadata_column(&MetadataColumnSpec::RowIndex));
3718 assert!(!schema.contains_metadata_column(&MetadataColumnSpec::RowId));
3719 Ok(())
3720 }
3721
3722 #[test]
3723 fn test_metadata_column_serialization() -> DeltaResult<()> {
3724 let field = StructField::create_metadata_column("test_row_id", MetadataColumnSpec::RowId);
3725
3726 let json = serde_json::to_string(&field)?;
3728 let deserialized: StructField = serde_json::from_str(&json)?;
3729
3730 assert_eq!(deserialized.name(), field.name());
3731 assert_eq!(deserialized.data_type(), field.data_type());
3732 assert_eq!(deserialized.nullable, field.nullable);
3733 assert!(deserialized.is_metadata_column());
3734 assert_eq!(
3735 deserialized.get_metadata_column_spec(),
3736 Some(MetadataColumnSpec::RowId)
3737 );
3738 Ok(())
3739 }
3740
3741 #[test]
3742 fn test_all_metadata_column_specs() -> DeltaResult<()> {
3743 let schema = StructType::try_new([StructField::nullable("regular_col", DataType::STRING)])?;
3744
3745 let schema = schema
3746 .add_metadata_column("row_index", MetadataColumnSpec::RowIndex)?
3747 .add_metadata_column("row_id", MetadataColumnSpec::RowId)?
3748 .add_metadata_column("row_commit_version", MetadataColumnSpec::RowCommitVersion)?;
3749
3750 assert_eq!(schema.fields().count(), 4);
3751 assert!(schema.contains_metadata_column(&MetadataColumnSpec::RowIndex));
3752 assert!(schema.contains_metadata_column(&MetadataColumnSpec::RowId));
3753 assert!(schema.contains_metadata_column(&MetadataColumnSpec::RowCommitVersion));
3754
3755 assert_eq!(
3756 schema.index_of_metadata_column(&MetadataColumnSpec::RowIndex),
3757 Some(&1)
3758 );
3759 assert_eq!(
3760 schema.index_of_metadata_column(&MetadataColumnSpec::RowId),
3761 Some(&2)
3762 );
3763 assert_eq!(
3764 schema.index_of_metadata_column(&MetadataColumnSpec::RowCommitVersion),
3765 Some(&3)
3766 );
3767 Ok(())
3768 }
3769
3770 #[test]
3771 fn test_physical_name_with_mode_none() {
3772 let field_json = r#"{
3773 "name": "logical_name",
3774 "type": "string",
3775 "nullable": true,
3776 "metadata": {
3777 "delta.columnMapping.physicalName": "physical_name_col123"
3778 }
3779 }"#;
3780 let field: StructField = serde_json::from_str(field_json).unwrap();
3781
3782 assert_eq!(field.physical_name(ColumnMappingMode::None), "logical_name");
3784 }
3785
3786 #[test]
3787 fn test_physical_name_with_mode_id() {
3788 let field_json = r#"{
3789 "name": "logical_name",
3790 "type": "string",
3791 "nullable": true,
3792 "metadata": {
3793 "delta.columnMapping.id": 5,
3794 "delta.columnMapping.physicalName": "physical_name_col123"
3795 }
3796 }"#;
3797 let field: StructField = serde_json::from_str(field_json).unwrap();
3798
3799 assert_eq!(
3801 field.physical_name(ColumnMappingMode::Id),
3802 "physical_name_col123"
3803 );
3804 }
3805
3806 #[test]
3807 fn test_physical_name_with_mode_name() {
3808 let field_json = r#"{
3809 "name": "logical_name",
3810 "type": "string",
3811 "nullable": true,
3812 "metadata": {
3813 "delta.columnMapping.physicalName": "physical_name_col456"
3814 }
3815 }"#;
3816 let field: StructField = serde_json::from_str(field_json).unwrap();
3817
3818 assert_eq!(
3820 field.physical_name(ColumnMappingMode::Name),
3821 "physical_name_col456"
3822 );
3823 }
3824
3825 #[test]
3826 fn test_physical_name_fallback_id() {
3827 let field_json = r#"{
3828 "name": "logical_name",
3829 "type": "string",
3830 "nullable": true,
3831 "metadata": {}
3832 }"#;
3833 let field: StructField = serde_json::from_str(field_json).unwrap();
3834
3835 assert_eq!(field.physical_name(ColumnMappingMode::Id), "logical_name");
3837 }
3838
3839 #[test]
3840 fn test_physical_name_fallback_name() {
3841 let field_json = r#"{
3842 "name": "logical_name",
3843 "type": "string",
3844 "nullable": true,
3845 "metadata": {}
3846 }"#;
3847 let field: StructField = serde_json::from_str(field_json).unwrap();
3848
3849 assert_eq!(field.physical_name(ColumnMappingMode::Name), "logical_name");
3851 }
3852
3853 #[test]
3854 fn test_display_struct_type_stable_output() -> DeltaResult<()> {
3855 let nested_field_with_metadata =
3856 StructField::create_metadata_column("nested_row_index", MetadataColumnSpec::RowIndex);
3857 let inner_struct =
3858 StructType::new_unchecked([StructField::new("q", DataType::LONG, false)]);
3859 let nested_struct = StructType::new_unchecked([
3860 nested_field_with_metadata,
3861 StructField::new("x", DataType::DOUBLE, true),
3862 StructField::new(
3863 "inner_struct",
3864 DataType::Struct(Box::new(inner_struct)),
3865 false,
3866 ),
3867 ]);
3868 let array_type = ArrayType::new(DataType::Struct(Box::new(nested_struct.clone())), true);
3869 let map_type = MapType::new(
3870 DataType::Struct(Box::new(nested_struct.clone())),
3871 DataType::Struct(Box::new(nested_struct.clone())), true,
3873 );
3874 let fields = vec![
3875 StructField::new("x", DataType::DOUBLE, true),
3876 StructField::new("y", DataType::FLOAT, false),
3877 StructField::new("z", DataType::LONG, true),
3878 StructField::new("s", nested_struct.clone(), false),
3879 StructField::nullable("array_col", DataType::Array(Box::new(array_type))),
3880 StructField::nullable("map_col", DataType::Map(Box::new(map_type))),
3881 StructField::new("a", DataType::LONG, true),
3882 ];
3883
3884 let struct_type = StructType::new_unchecked(fields);
3885 assert_eq!(
3886 struct_type.to_string(),
3887 "struct:
3888├─x: double (is nullable: true, metadata: {})
3889├─y: float (is nullable: false, metadata: {})
3890├─z: long (is nullable: true, metadata: {})
3891├─s: struct<nested_row_index: long, x: double, inner_struct: struct<q: long>> (is nullable: false, metadata: {})
3892│ ├─nested_row_index: long (is nullable: false, metadata: {delta.metadataSpec: String(\"row_index\")})
3893│ ├─x: double (is nullable: true, metadata: {})
3894│ └─inner_struct: struct<q: long> (is nullable: false, metadata: {})
3895│ └─q: long (is nullable: false, metadata: {})
3896├─array_col: array<struct<nested_row_index: long, x: double, inner_struct: struct<q: long>>> (is nullable: true, metadata: {})
3897│ └─array_element: struct<nested_row_index: long, x: double, inner_struct: struct<q: long>>
3898│ ├─nested_row_index: long (is nullable: false, metadata: {delta.metadataSpec: String(\"row_index\")})
3899│ ├─x: double (is nullable: true, metadata: {})
3900│ └─inner_struct: struct<q: long> (is nullable: false, metadata: {})
3901│ └─q: long (is nullable: false, metadata: {})
3902├─map_col: map<struct<nested_row_index: long, x: double, inner_struct: struct<q: long>>, struct<nested_row_index: long, x: double, inner_struct: struct<q: long>>> (is nullable: true, metadata: {})
3903│ ├─map_key: struct<nested_row_index: long, x: double, inner_struct: struct<q: long>>
3904│ │ ├─nested_row_index: long (is nullable: false, metadata: {delta.metadataSpec: String(\"row_index\")})
3905│ │ ├─x: double (is nullable: true, metadata: {})
3906│ │ └─inner_struct: struct<q: long> (is nullable: false, metadata: {})
3907│ │ └─q: long (is nullable: false, metadata: {})
3908│ └─map_value: struct<nested_row_index: long, x: double, inner_struct: struct<q: long>>
3909│ ├─nested_row_index: long (is nullable: false, metadata: {delta.metadataSpec: String(\"row_index\")})
3910│ ├─x: double (is nullable: true, metadata: {})
3911│ └─inner_struct: struct<q: long> (is nullable: false, metadata: {})
3912│ └─q: long (is nullable: false, metadata: {})
3913└─a: long (is nullable: true, metadata: {})
3914"
3915 );
3916
3917 let schema = StructType::try_new([StructField::nullable("regular_col", DataType::STRING)])?;
3918 let schema = schema
3919 .add_metadata_column("row_index", MetadataColumnSpec::RowIndex)?
3920 .add_metadata_column("row_id", MetadataColumnSpec::RowId)?
3921 .add_metadata_column("row_commit_version", MetadataColumnSpec::RowCommitVersion)?;
3922 assert_eq!(schema.to_string(), "struct:
3923├─regular_col: string (is nullable: true, metadata: {})
3924├─row_index: long (is nullable: false, metadata: {delta.metadataSpec: String(\"row_index\")})
3925├─row_id: long (is nullable: false, metadata: {delta.metadataSpec: String(\"row_id\")})
3926└─row_commit_version: long (is nullable: false, metadata: {delta.metadataSpec: String(\"row_commit_version\")})
3927");
3928 Ok(())
3929 }
3930
3931 #[test]
3932 fn test_builder_empty() {
3933 let schema = StructType::builder().build().unwrap();
3934 assert_eq!(schema.num_fields(), 0)
3935 }
3936
3937 #[test]
3938 fn test_builder_add_fields() {
3939 let schema = StructType::builder()
3940 .add_field(StructField::new("id", DataType::INTEGER, false))
3941 .add_field(StructField::new("name", DataType::STRING, true))
3942 .build()
3943 .unwrap();
3944
3945 assert_eq!(schema.num_fields(), 2);
3946 assert_eq!(schema.field_at_index(0).unwrap().name(), "id");
3947 assert_eq!(schema.field_at_index(1).unwrap().name(), "name");
3948 }
3949
3950 #[test]
3951 fn test_builder_from_schema() {
3952 let base_schema =
3953 StructType::try_new([StructField::new("id", DataType::INTEGER, false)]).unwrap();
3954
3955 let extended_schema = StructTypeBuilder::from_schema(&base_schema)
3956 .add_field(StructField::new("name", DataType::STRING, true))
3957 .build()
3958 .unwrap();
3959
3960 assert_eq!(extended_schema.num_fields(), 2);
3961 assert_eq!(extended_schema.field_at_index(0).unwrap().name(), "id");
3962 assert_eq!(extended_schema.field_at_index(1).unwrap().name(), "name");
3963 }
3964
3965 #[test]
3966 fn test_parquet_field_id_key_value() {
3967 assert_eq!(
3971 ColumnMetadataKey::ParquetFieldId.as_ref(),
3972 "parquet.field.id"
3973 );
3974 }
3975
3976 #[test]
3977 fn test_with_field_inserted_empty_struct() {
3978 let schema = StructType::try_new([]).unwrap();
3979 let schema = schema
3980 .with_field_inserted_after(None, StructField::new("age", DataType::STRING, true))
3981 .expect("with field inserted should produce a valid schema");
3982 assert_eq!(schema.num_fields(), 1);
3983 assert_eq!(schema.field_at_index(0).unwrap().name(), "age");
3984 }
3985
3986 #[test]
3987 fn test_with_field_inserted() {
3988 let schema = StructType::try_new([
3989 StructField::new("id", DataType::INTEGER, false),
3990 StructField::new("name", DataType::STRING, true),
3991 ])
3992 .unwrap();
3993 let schema = schema
3994 .with_field_inserted_after(Some("id"), StructField::new("age", DataType::STRING, true))
3995 .expect("with field inserted should produce a valid schema");
3996 assert_eq!(schema.num_fields(), 3);
3997 assert_eq!(schema.field_at_index(0).unwrap().name(), "id");
3998 assert_eq!(schema.field_at_index(1).unwrap().name(), "age");
3999 assert_eq!(schema.field_at_index(2).unwrap().name(), "name");
4000 }
4001
4002 #[test]
4003 fn test_with_field_inserted_append_to_end() {
4004 let schema = StructType::try_new([
4005 StructField::new("id", DataType::INTEGER, false),
4006 StructField::new("name", DataType::STRING, true),
4007 ])
4008 .unwrap();
4009 let schema = schema
4010 .with_field_inserted_after(None, StructField::new("age", DataType::STRING, true))
4011 .expect("with field inserted should produce a valid schema");
4012
4013 assert_eq!(schema.num_fields(), 3);
4014 assert_eq!(schema.field_at_index(0).unwrap().name(), "id");
4015 assert_eq!(schema.field_at_index(1).unwrap().name(), "name");
4016 assert_eq!(schema.field_at_index(2).unwrap().name(), "age");
4017 }
4018
4019 #[test]
4020 fn test_with_field_inserted_after_non_existent_field() {
4021 let schema =
4022 StructType::try_new([StructField::new("id", DataType::INTEGER, false)]).unwrap();
4023 let new_schema = schema.with_field_inserted_after(
4024 Some("nonexistent"),
4025 StructField::new("name", DataType::STRING, true),
4026 );
4027 assert!(new_schema.is_err());
4028 }
4029
4030 #[test]
4031 fn test_with_field_inserted_after_duplicate_field() {
4032 let schema = StructType::try_new([
4033 StructField::new("id", DataType::INTEGER, false),
4034 StructField::new("name", DataType::STRING, true),
4035 ])
4036 .unwrap();
4037 let new_schema = schema.with_field_inserted_after(
4038 Some("name"),
4039 StructField::new("id", DataType::STRING, true),
4040 );
4041 assert!(new_schema.is_err());
4042 assert_result_error_with_message(new_schema, "Field id already exists");
4043 }
4044
4045 #[test]
4046 fn test_with_field_inserted_before() {
4047 let schema = StructType::try_new([
4048 StructField::new("id", DataType::INTEGER, false),
4049 StructField::new("name", DataType::STRING, true),
4050 ])
4051 .unwrap();
4052 let schema = schema
4053 .with_field_inserted_before(
4054 Some("name"),
4055 StructField::new("age", DataType::STRING, true),
4056 )
4057 .expect("with field inserted before should produce a valid schema");
4058 assert_eq!(schema.num_fields(), 3);
4059 assert_eq!(schema.field_at_index(0).unwrap().name(), "id");
4060 assert_eq!(schema.field_at_index(1).unwrap().name(), "age");
4061 assert_eq!(schema.field_at_index(2).unwrap().name(), "name");
4062 }
4063
4064 #[test]
4065 fn test_with_field_inserted_before_duplicate_field() {
4066 let schema = StructType::try_new([
4067 StructField::new("id", DataType::INTEGER, false),
4068 StructField::new("name", DataType::STRING, true),
4069 ])
4070 .unwrap();
4071 let new_schema = schema.with_field_inserted_before(
4072 Some("name"),
4073 StructField::new("id", DataType::STRING, true),
4074 );
4075 assert!(new_schema.is_err());
4076 assert_result_error_with_message(new_schema, "Field id already exists");
4077 }
4078
4079 #[test]
4080 fn test_with_field_inserted_before_at_beginning() {
4081 let schema = StructType::try_new([
4082 StructField::new("id", DataType::INTEGER, false),
4083 StructField::new("name", DataType::STRING, true),
4084 ])
4085 .unwrap();
4086 let schema = schema
4087 .with_field_inserted_before(None, StructField::new("age", DataType::STRING, true))
4088 .expect("with field inserted before should produce a valid schema");
4089 assert_eq!(schema.num_fields(), 3);
4090 assert_eq!(schema.field_at_index(0).unwrap().name(), "age");
4091 assert_eq!(schema.field_at_index(1).unwrap().name(), "id");
4092 assert_eq!(schema.field_at_index(2).unwrap().name(), "name");
4093 }
4094
4095 #[test]
4096 fn test_with_field_inserted_before_non_existent_field() {
4097 let schema =
4098 StructType::try_new([StructField::new("id", DataType::INTEGER, false)]).unwrap();
4099 let new_schema = schema.with_field_inserted_before(
4100 Some("nonexistent"),
4101 StructField::new("name", DataType::STRING, true),
4102 );
4103 assert!(new_schema.is_err());
4104 }
4105
4106 #[test]
4107 fn test_with_field_inserted_before_empty_struct() {
4108 let schema = StructType::try_new([]).unwrap();
4109 let schema = schema
4110 .with_field_inserted_before(None, StructField::new("age", DataType::STRING, true))
4111 .expect("with field inserted before on empty struct should succeed");
4112 assert_eq!(schema.num_fields(), 1);
4113 assert_eq!(schema.field_at_index(0).unwrap().name(), "age");
4114 }
4115
4116 #[test]
4117 fn test_with_field_removed() {
4118 let schema =
4119 StructType::try_new([StructField::new("id", DataType::INTEGER, false)]).unwrap();
4120 let new_schema = schema.with_field_removed("id");
4121 assert_eq!(new_schema.num_fields(), 0);
4122 }
4123
4124 #[test]
4125 fn test_with_field_removed_non_existent_field() {
4126 let schema =
4127 StructType::try_new([StructField::new("id", DataType::INTEGER, false)]).unwrap();
4128 let new_schema = schema.with_field_removed("nonexistent");
4129 assert_eq!(new_schema.num_fields(), 1);
4130 assert_eq!(new_schema.field_at_index(0).unwrap().name(), "id");
4131 }
4132
4133 #[test]
4134 fn test_with_field_replaced() {
4135 let schema =
4136 StructType::try_new([StructField::new("id", DataType::INTEGER, false)]).unwrap();
4137 let new_schema = schema
4138 .with_field_replaced("id", StructField::new("name", DataType::STRING, true))
4139 .unwrap();
4140
4141 assert_eq!(new_schema.num_fields(), 1);
4142 assert_eq!(new_schema.field_at_index(0).unwrap().name(), "name");
4143 }
4144
4145 #[test]
4146 fn test_with_field_replaced_non_existent_field() {
4147 let schema =
4148 StructType::try_new([StructField::new("id", DataType::INTEGER, false)]).unwrap();
4149 let new_schema = schema.with_field_replaced(
4150 "nonexistent",
4151 StructField::new("name", DataType::STRING, true),
4152 );
4153 assert!(new_schema.is_err(), "Expected error for non-existent field");
4154 }
4155
4156 fn walk_test_schema() -> StructType {
4158 let l3 = StructType::new_unchecked([StructField::new("c", DataType::DOUBLE, false)]);
4159 let l2 = StructType::new_unchecked([StructField::new(
4160 "b",
4161 DataType::Struct(Box::new(l3)),
4162 false,
4163 )]);
4164 StructType::new_unchecked([StructField::new("a", DataType::Struct(Box::new(l2)), false)])
4165 }
4166
4167 #[rstest::rstest]
4168 #[case::single_level(vec!["a"], vec!["a"], DataType::Struct(Box::new(
4169 StructType::new_unchecked([StructField::new("b", DataType::Struct(Box::new(
4170 StructType::new_unchecked([StructField::new("c", DataType::DOUBLE, false)])
4171 )), false)])
4172 )))]
4173 #[case::nested_2(vec!["a", "b"], vec!["a", "b"], DataType::Struct(Box::new(
4174 StructType::new_unchecked([StructField::new("c", DataType::DOUBLE, false)])
4175 )))]
4176 #[case::nested_3(vec!["a", "b", "c"], vec!["a", "b", "c"], DataType::DOUBLE)]
4177 #[test]
4178 fn test_walk_column_fields_happy(
4179 #[case] col_path: Vec<&str>,
4180 #[case] expected_names: Vec<&str>,
4181 #[case] expected_leaf_type: DataType,
4182 ) {
4183 let schema = walk_test_schema();
4184 let fields = schema
4185 .walk_column_fields(&ColumnName::new(col_path.iter().copied()))
4186 .unwrap();
4187 assert_eq!(fields.len(), expected_names.len());
4188 for (field, name) in fields.iter().zip(expected_names.iter()) {
4189 assert_eq!(field.name(), *name);
4190 }
4191 assert_eq!(fields.last().unwrap().data_type(), &expected_leaf_type);
4192 }
4193
4194 #[rstest::rstest]
4195 #[case::empty_path(vec![], "Column path cannot be empty")]
4196 #[case::not_found_top(vec!["x"], "not found in schema")]
4197 #[case::not_found_nested(vec!["a", "x"], "not found in schema")]
4198 #[case::intermediate_not_struct(vec!["a", "b", "c", "d"], "not a struct type")]
4199 #[test]
4200 fn test_walk_column_fields_error(#[case] col_path: Vec<&str>, #[case] expected_error: &str) {
4201 let schema = walk_test_schema();
4202 let result = schema.walk_column_fields(&ColumnName::new(col_path.iter().copied()));
4203 assert_result_error_with_message(result, expected_error);
4204 }
4205
4206 #[test]
4207 fn test_normalize_column_names_to_schema_casing() {
4208 let inner =
4209 StructType::new_unchecked(vec![StructField::new("City", DataType::STRING, false)]);
4210 let schema = StructType::new_unchecked(vec![
4211 StructField::new("id", DataType::INTEGER, false),
4212 StructField::new("EventDate", DataType::DATE, false),
4213 StructField::new("Address", DataType::Struct(Box::new(inner)), false),
4214 ]);
4215
4216 let cols = vec![ColumnName::new(["eventdate"])];
4218 assert_eq!(
4219 normalize_column_names_to_schema_casing(&schema, &cols)[0].path(),
4220 ["EventDate"]
4221 );
4222
4223 let cols = vec![ColumnName::new(["address", "city"])];
4225 assert_eq!(
4226 normalize_column_names_to_schema_casing(&schema, &cols)[0].path(),
4227 ["Address", "City"]
4228 );
4229
4230 let cols = vec![ColumnName::new(["id"])];
4232 assert_eq!(
4233 normalize_column_names_to_schema_casing(&schema, &cols)[0].path(),
4234 ["id"]
4235 );
4236
4237 let cols = vec![ColumnName::new(["nonexistent"])];
4239 assert_eq!(
4240 normalize_column_names_to_schema_casing(&schema, &cols)[0].path(),
4241 ["nonexistent"]
4242 );
4243 }
4244}