1use arrow::{
24 array::{new_null_array, ArrayRef, RecordBatch, RecordBatchOptions},
25 compute::can_cast_types,
26 datatypes::{DataType, Field, Schema, SchemaRef},
27};
28use datafusion_common::{
29 format::DEFAULT_CAST_OPTIONS,
30 nested_struct::{cast_column, validate_struct_compatibility},
31 plan_err, ColumnStatistics,
32};
33use std::{fmt::Debug, sync::Arc};
34pub type CastColumnFn = dyn Fn(
37 &ArrayRef,
38 &Field,
39 &arrow::compute::CastOptions,
40 ) -> datafusion_common::Result<ArrayRef>
41 + Send
42 + Sync;
43
44pub trait SchemaAdapterFactory: Debug + Send + Sync + 'static {
53 fn create(
62 &self,
63 projected_table_schema: SchemaRef,
64 table_schema: SchemaRef,
65 ) -> Box<dyn SchemaAdapter>;
66
67 fn create_with_projected_schema(
72 &self,
73 projected_table_schema: SchemaRef,
74 ) -> Box<dyn SchemaAdapter> {
75 self.create(Arc::clone(&projected_table_schema), projected_table_schema)
76 }
77}
78
79pub trait SchemaAdapter: Send + Sync {
87 fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize>;
95
96 fn map_schema(
112 &self,
113 file_schema: &Schema,
114 ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)>;
115}
116
117pub trait SchemaMapper: Debug + Send + Sync {
121 fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch>;
123
124 fn map_column_statistics(
126 &self,
127 file_col_statistics: &[ColumnStatistics],
128 ) -> datafusion_common::Result<Vec<ColumnStatistics>>;
129}
130
131#[derive(Clone, Debug, Default)]
217pub struct DefaultSchemaAdapterFactory;
218
219impl DefaultSchemaAdapterFactory {
220 pub fn from_schema(table_schema: SchemaRef) -> Box<dyn SchemaAdapter> {
227 Self.create(Arc::clone(&table_schema), table_schema)
228 }
229}
230
231impl SchemaAdapterFactory for DefaultSchemaAdapterFactory {
232 fn create(
233 &self,
234 projected_table_schema: SchemaRef,
235 _table_schema: SchemaRef,
236 ) -> Box<dyn SchemaAdapter> {
237 Box::new(DefaultSchemaAdapter {
238 projected_table_schema,
239 })
240 }
241}
242
243#[derive(Clone, Debug)]
246pub(crate) struct DefaultSchemaAdapter {
247 projected_table_schema: SchemaRef,
250}
251
252pub(crate) fn can_cast_field(
256 file_field: &Field,
257 table_field: &Field,
258) -> datafusion_common::Result<bool> {
259 match (file_field.data_type(), table_field.data_type()) {
260 (DataType::Struct(source_fields), DataType::Struct(target_fields)) => {
261 validate_struct_compatibility(source_fields, target_fields)?;
263 Ok(true)
264 }
265 _ => {
266 if can_cast_types(file_field.data_type(), table_field.data_type()) {
267 Ok(true)
268 } else {
269 plan_err!(
270 "Cannot cast file schema field {} of type {} to table schema field of type {}",
271 file_field.name(),
272 file_field.data_type(),
273 table_field.data_type()
274 )
275 }
276 }
277 }
278}
279
280impl SchemaAdapter for DefaultSchemaAdapter {
281 fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize> {
286 let field = self.projected_table_schema.field(index);
287 Some(file_schema.fields.find(field.name())?.0)
288 }
289
290 fn map_schema(
300 &self,
301 file_schema: &Schema,
302 ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
303 let (field_mappings, projection) = create_field_mapping(
304 file_schema,
305 &self.projected_table_schema,
306 can_cast_field,
307 )?;
308
309 Ok((
310 Arc::new(SchemaMapping::new(
311 Arc::clone(&self.projected_table_schema),
312 field_mappings,
313 Arc::new(
314 |array: &ArrayRef,
315 field: &Field,
316 opts: &arrow::compute::CastOptions| {
317 cast_column(array, field, opts)
318 },
319 ),
320 )),
321 projection,
322 ))
323 }
324}
325
326pub(crate) fn create_field_mapping<F>(
333 file_schema: &Schema,
334 projected_table_schema: &SchemaRef,
335 can_map_field: F,
336) -> datafusion_common::Result<(Vec<Option<usize>>, Vec<usize>)>
337where
338 F: Fn(&Field, &Field) -> datafusion_common::Result<bool>,
339{
340 let mut projection = Vec::with_capacity(file_schema.fields().len());
341 let mut field_mappings = vec![None; projected_table_schema.fields().len()];
342
343 for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
344 if let Some((table_idx, table_field)) =
345 projected_table_schema.fields().find(file_field.name())
346 {
347 if can_map_field(file_field, table_field)? {
348 field_mappings[table_idx] = Some(projection.len());
349 projection.push(file_idx);
350 }
351 }
352 }
353
354 Ok((field_mappings, projection))
355}
356
357pub struct SchemaMapping {
367 projected_table_schema: SchemaRef,
370 field_mappings: Vec<Option<usize>>,
376 cast_column: Arc<CastColumnFn>,
379}
380
381impl Debug for SchemaMapping {
382 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
383 f.debug_struct("SchemaMapping")
384 .field("projected_table_schema", &self.projected_table_schema)
385 .field("field_mappings", &self.field_mappings)
386 .field("cast_column", &"<fn>")
387 .finish()
388 }
389}
390
391impl SchemaMapping {
392 pub fn new(
396 projected_table_schema: SchemaRef,
397 field_mappings: Vec<Option<usize>>,
398 cast_column: Arc<CastColumnFn>,
399 ) -> Self {
400 Self {
401 projected_table_schema,
402 field_mappings,
403 cast_column,
404 }
405 }
406}
407
408impl SchemaMapper for SchemaMapping {
409 fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch> {
413 let (_old_schema, batch_cols, batch_rows) = batch.into_parts();
414
415 let cols = self
416 .projected_table_schema
417 .fields()
419 .iter()
420 .zip(&self.field_mappings)
423 .map(|(field, file_idx)| {
425 file_idx.map_or_else(
426 || Ok(new_null_array(field.data_type(), batch_rows)),
429 |batch_idx| {
432 (self.cast_column)(
433 &batch_cols[batch_idx],
434 field,
435 &DEFAULT_CAST_OPTIONS,
436 )
437 },
438 )
439 })
440 .collect::<datafusion_common::Result<Vec<_>, _>>()?;
441
442 let options = RecordBatchOptions::new().with_row_count(Some(batch_rows));
444
445 let schema = Arc::clone(&self.projected_table_schema);
446 let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?;
447 Ok(record_batch)
448 }
449
450 fn map_column_statistics(
452 &self,
453 file_col_statistics: &[ColumnStatistics],
454 ) -> datafusion_common::Result<Vec<ColumnStatistics>> {
455 let mut table_col_statistics = vec![];
456
457 for (_, file_col_idx) in self
460 .projected_table_schema
461 .fields()
462 .iter()
463 .zip(&self.field_mappings)
464 {
465 if let Some(file_col_idx) = file_col_idx {
466 table_col_statistics.push(
467 file_col_statistics
468 .get(*file_col_idx)
469 .cloned()
470 .unwrap_or_default(),
471 );
472 } else {
473 table_col_statistics.push(ColumnStatistics::new_unknown());
474 }
475 }
476
477 Ok(table_col_statistics)
478 }
479}
480
481#[cfg(test)]
482mod tests {
483 use super::*;
484 use arrow::{
485 array::{Array, ArrayRef, StringBuilder, StructArray, TimestampMillisecondArray},
486 compute::cast,
487 datatypes::{DataType, Field, TimeUnit},
488 record_batch::RecordBatch,
489 };
490 use datafusion_common::{stats::Precision, Result, ScalarValue, Statistics};
491
492 #[test]
493 fn test_schema_mapping_map_statistics_basic() {
494 let table_schema = Arc::new(Schema::new(vec![
496 Field::new("a", DataType::Int32, true),
497 Field::new("b", DataType::Utf8, true),
498 Field::new("c", DataType::Float64, true),
499 ]));
500
501 let file_schema = Schema::new(vec![
503 Field::new("b", DataType::Utf8, true),
504 Field::new("a", DataType::Int32, true),
505 ]);
506
507 let adapter = DefaultSchemaAdapter {
509 projected_table_schema: Arc::clone(&table_schema),
510 };
511
512 let (mapper, projection) = adapter.map_schema(&file_schema).unwrap();
514
515 assert_eq!(projection, vec![0, 1]);
517
518 let mut file_stats = Statistics::default();
520
521 let b_stats = ColumnStatistics {
523 null_count: Precision::Exact(5),
524 ..Default::default()
525 };
526
527 let a_stats = ColumnStatistics {
529 null_count: Precision::Exact(10),
530 ..Default::default()
531 };
532
533 file_stats.column_statistics = vec![b_stats, a_stats];
534
535 let table_col_stats = mapper
537 .map_column_statistics(&file_stats.column_statistics)
538 .unwrap();
539
540 assert_eq!(table_col_stats.len(), 3);
542 assert_eq!(table_col_stats[0].null_count, Precision::Exact(10)); assert_eq!(table_col_stats[1].null_count, Precision::Exact(5)); assert_eq!(table_col_stats[2].null_count, Precision::Absent); }
546
547 #[test]
548 fn test_schema_mapping_map_statistics_empty() {
549 let table_schema = Arc::new(Schema::new(vec![
551 Field::new("a", DataType::Int32, true),
552 Field::new("b", DataType::Utf8, true),
553 ]));
554 let file_schema = Schema::new(vec![
555 Field::new("a", DataType::Int32, true),
556 Field::new("b", DataType::Utf8, true),
557 ]);
558
559 let adapter = DefaultSchemaAdapter {
560 projected_table_schema: Arc::clone(&table_schema),
561 };
562 let (mapper, _) = adapter.map_schema(&file_schema).unwrap();
563
564 let file_stats = Statistics::default();
566 let table_col_stats = mapper
567 .map_column_statistics(&file_stats.column_statistics)
568 .unwrap();
569
570 assert_eq!(table_col_stats.len(), 2);
572 assert_eq!(table_col_stats[0], ColumnStatistics::new_unknown(),);
573 assert_eq!(table_col_stats[1], ColumnStatistics::new_unknown(),);
574 }
575
576 #[test]
577 fn test_can_cast_field() {
578 let from_field = Field::new("col", DataType::Int32, true);
580 let to_field = Field::new("col", DataType::Int32, true);
581 assert!(can_cast_field(&from_field, &to_field).unwrap());
582
583 let from_field = Field::new("col", DataType::Int32, true);
585 let to_field = Field::new("col", DataType::Float64, true);
586 assert!(can_cast_field(&from_field, &to_field).unwrap());
587
588 let from_field = Field::new("col", DataType::Float64, true);
590 let to_field = Field::new("col", DataType::Utf8, true);
591 assert!(can_cast_field(&from_field, &to_field).unwrap());
592
593 let from_field = Field::new("col", DataType::Binary, true);
596 let to_field = Field::new("col", DataType::Decimal128(10, 2), true);
597 let result = can_cast_field(&from_field, &to_field);
598 assert!(result.is_err());
599 let error_msg = result.unwrap_err().to_string();
600 assert!(error_msg.contains("Cannot cast file schema field col"));
601 }
602
603 #[test]
604 fn test_create_field_mapping() {
605 let table_schema = Arc::new(Schema::new(vec![
607 Field::new("a", DataType::Int32, true),
608 Field::new("b", DataType::Utf8, true),
609 Field::new("c", DataType::Float64, true),
610 ]));
611
612 let file_schema = Schema::new(vec![
614 Field::new("b", DataType::Float64, true), Field::new("a", DataType::Int32, true), Field::new("d", DataType::Boolean, true), ]);
618
619 let allow_all = |_: &Field, _: &Field| Ok(true);
621
622 let (field_mappings, projection) =
624 create_field_mapping(&file_schema, &table_schema, allow_all).unwrap();
625
626 assert_eq!(field_mappings, vec![Some(1), Some(0), None]);
631 assert_eq!(projection, vec![0, 1]); let fails_all = |_: &Field, _: &Field| Ok(false);
635 let (field_mappings, projection) =
636 create_field_mapping(&file_schema, &table_schema, fails_all).unwrap();
637
638 assert_eq!(field_mappings, vec![None, None, None]);
640 assert_eq!(projection, Vec::<usize>::new());
641
642 let error_mapper = |_: &Field, _: &Field| plan_err!("Test error");
644 let result = create_field_mapping(&file_schema, &table_schema, error_mapper);
645 assert!(result.is_err());
646 assert!(result.unwrap_err().to_string().contains("Test error"));
647 }
648
649 #[test]
650 fn test_schema_mapping_new() {
651 let projected_schema = Arc::new(Schema::new(vec![
653 Field::new("a", DataType::Int32, true),
654 Field::new("b", DataType::Utf8, true),
655 ]));
656
657 let field_mappings = vec![Some(1), Some(0)];
659
660 let mapping = SchemaMapping::new(
662 Arc::clone(&projected_schema),
663 field_mappings.clone(),
664 Arc::new(
665 |array: &ArrayRef, field: &Field, opts: &arrow::compute::CastOptions| {
666 cast_column(array, field, opts)
667 },
668 ),
669 );
670
671 assert_eq!(*mapping.projected_table_schema, *projected_schema);
673 assert_eq!(mapping.field_mappings, field_mappings);
674
675 let batch = RecordBatch::try_new(
677 Arc::new(Schema::new(vec![
678 Field::new("b_file", DataType::Utf8, true),
679 Field::new("a_file", DataType::Int32, true),
680 ])),
681 vec![
682 Arc::new(arrow::array::StringArray::from(vec!["hello", "world"])),
683 Arc::new(arrow::array::Int32Array::from(vec![1, 2])),
684 ],
685 )
686 .unwrap();
687
688 let mapped_batch = mapping.map_batch(batch).unwrap();
690
691 assert_eq!(*mapped_batch.schema(), *projected_schema);
693 assert_eq!(mapped_batch.num_columns(), 2);
694 assert_eq!(mapped_batch.column(0).len(), 2); assert_eq!(mapped_batch.column(1).len(), 2); }
697
698 #[test]
699 fn test_map_schema_error_path() {
700 let table_schema = Arc::new(Schema::new(vec![
702 Field::new("a", DataType::Int32, true),
703 Field::new("b", DataType::Utf8, true),
704 Field::new("c", DataType::Decimal128(10, 2), true), ]));
706
707 let file_schema = Schema::new(vec![
709 Field::new("a", DataType::Int32, true),
710 Field::new("b", DataType::Float64, true), Field::new("c", DataType::Binary, true), ]);
713
714 let adapter = DefaultSchemaAdapter {
716 projected_table_schema: Arc::clone(&table_schema),
717 };
718
719 let result = adapter.map_schema(&file_schema);
721 assert!(result.is_err());
722 let error_msg = result.unwrap_err().to_string();
723 assert!(error_msg.contains("Cannot cast file schema field c"));
724 }
725
726 #[test]
727 fn test_map_schema_happy_path() {
728 let table_schema = Arc::new(Schema::new(vec![
730 Field::new("a", DataType::Int32, true),
731 Field::new("b", DataType::Utf8, true),
732 Field::new("c", DataType::Decimal128(10, 2), true),
733 ]));
734
735 let adapter = DefaultSchemaAdapter {
737 projected_table_schema: Arc::clone(&table_schema),
738 };
739
740 let compatible_file_schema = Schema::new(vec![
742 Field::new("a", DataType::Int64, true), Field::new("b", DataType::Float64, true), ]);
745
746 let (mapper, projection) = adapter.map_schema(&compatible_file_schema).unwrap();
748
749 assert_eq!(projection, vec![0, 1]); let file_batch = RecordBatch::try_new(
754 Arc::new(compatible_file_schema.clone()),
755 vec![
756 Arc::new(arrow::array::Int64Array::from(vec![100, 200])),
757 Arc::new(arrow::array::Float64Array::from(vec![1.5, 2.5])),
758 ],
759 )
760 .unwrap();
761
762 let mapped_batch = mapper.map_batch(file_batch).unwrap();
763
764 assert_eq!(*mapped_batch.schema(), *table_schema);
766 assert_eq!(mapped_batch.num_columns(), 3); let c_array = mapped_batch.column(2);
770 assert_eq!(c_array.len(), 2);
771 assert_eq!(c_array.null_count(), 2);
772 }
773
774 #[test]
775 fn test_adapt_struct_with_added_nested_fields() -> Result<()> {
776 let (file_schema, table_schema) = create_test_schemas_with_nested_fields();
777 let batch = create_test_batch_with_struct_data(&file_schema)?;
778
779 let adapter = DefaultSchemaAdapter {
780 projected_table_schema: Arc::clone(&table_schema),
781 };
782 let (mapper, _) = adapter.map_schema(file_schema.as_ref())?;
783 let mapped_batch = mapper.map_batch(batch)?;
784
785 verify_adapted_batch_with_nested_fields(&mapped_batch, &table_schema)?;
786 Ok(())
787 }
788
789 #[test]
790 fn test_map_column_statistics_struct() -> Result<()> {
791 let (file_schema, table_schema) = create_test_schemas_with_nested_fields();
792
793 let adapter = DefaultSchemaAdapter {
794 projected_table_schema: Arc::clone(&table_schema),
795 };
796 let (mapper, _) = adapter.map_schema(file_schema.as_ref())?;
797
798 let file_stats = vec![
799 create_test_column_statistics(
800 0,
801 100,
802 Some(ScalarValue::Int32(Some(1))),
803 Some(ScalarValue::Int32(Some(100))),
804 Some(ScalarValue::Int32(Some(5100))),
805 ),
806 create_test_column_statistics(10, 50, None, None, None),
807 ];
808
809 let table_stats = mapper.map_column_statistics(&file_stats)?;
810 assert_eq!(table_stats.len(), 1);
811 verify_column_statistics(
812 &table_stats[0],
813 Some(0),
814 Some(100),
815 Some(ScalarValue::Int32(Some(1))),
816 Some(ScalarValue::Int32(Some(100))),
817 Some(ScalarValue::Int32(Some(5100))),
818 );
819 let missing_stats = mapper.map_column_statistics(&[])?;
820 assert_eq!(missing_stats.len(), 1);
821 assert_eq!(missing_stats[0], ColumnStatistics::new_unknown());
822 Ok(())
823 }
824
825 fn create_test_schemas_with_nested_fields() -> (SchemaRef, SchemaRef) {
826 let file_schema = Arc::new(Schema::new(vec![Field::new(
827 "info",
828 DataType::Struct(
829 vec![
830 Field::new("location", DataType::Utf8, true),
831 Field::new(
832 "timestamp_utc",
833 DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
834 true,
835 ),
836 ]
837 .into(),
838 ),
839 true,
840 )]));
841
842 let table_schema = Arc::new(Schema::new(vec![Field::new(
843 "info",
844 DataType::Struct(
845 vec![
846 Field::new("location", DataType::Utf8, true),
847 Field::new(
848 "timestamp_utc",
849 DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
850 true,
851 ),
852 Field::new(
853 "reason",
854 DataType::Struct(
855 vec![
856 Field::new("_level", DataType::Float64, true),
857 Field::new(
858 "details",
859 DataType::Struct(
860 vec![
861 Field::new("rurl", DataType::Utf8, true),
862 Field::new("s", DataType::Float64, true),
863 Field::new("t", DataType::Utf8, true),
864 ]
865 .into(),
866 ),
867 true,
868 ),
869 ]
870 .into(),
871 ),
872 true,
873 ),
874 ]
875 .into(),
876 ),
877 true,
878 )]));
879
880 (file_schema, table_schema)
881 }
882
883 fn create_test_batch_with_struct_data(
884 file_schema: &SchemaRef,
885 ) -> Result<RecordBatch> {
886 let mut location_builder = StringBuilder::new();
887 location_builder.append_value("San Francisco");
888 location_builder.append_value("New York");
889
890 let timestamp_array = TimestampMillisecondArray::from(vec![
891 Some(1640995200000),
892 Some(1641081600000),
893 ]);
894
895 let timestamp_type =
896 DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into()));
897 let timestamp_array = cast(×tamp_array, ×tamp_type)?;
898
899 let info_struct = StructArray::from(vec![
900 (
901 Arc::new(Field::new("location", DataType::Utf8, true)),
902 Arc::new(location_builder.finish()) as ArrayRef,
903 ),
904 (
905 Arc::new(Field::new("timestamp_utc", timestamp_type, true)),
906 timestamp_array,
907 ),
908 ]);
909
910 Ok(RecordBatch::try_new(
911 Arc::clone(file_schema),
912 vec![Arc::new(info_struct)],
913 )?)
914 }
915
916 fn verify_adapted_batch_with_nested_fields(
917 mapped_batch: &RecordBatch,
918 table_schema: &SchemaRef,
919 ) -> Result<()> {
920 assert_eq!(mapped_batch.schema(), *table_schema);
921 assert_eq!(mapped_batch.num_rows(), 2);
922
923 let info_col = mapped_batch.column(0);
924 let info_array = info_col
925 .as_any()
926 .downcast_ref::<StructArray>()
927 .expect("Expected info column to be a StructArray");
928
929 verify_preserved_fields(info_array)?;
930 verify_reason_field_structure(info_array)?;
931 Ok(())
932 }
933
934 fn verify_preserved_fields(info_array: &StructArray) -> Result<()> {
935 let location_col = info_array
936 .column_by_name("location")
937 .expect("Expected location field in struct");
938 let location_array = location_col
939 .as_any()
940 .downcast_ref::<arrow::array::StringArray>()
941 .expect("Expected location to be a StringArray");
942 assert_eq!(location_array.value(0), "San Francisco");
943 assert_eq!(location_array.value(1), "New York");
944
945 let timestamp_col = info_array
946 .column_by_name("timestamp_utc")
947 .expect("Expected timestamp_utc field in struct");
948 let timestamp_array = timestamp_col
949 .as_any()
950 .downcast_ref::<TimestampMillisecondArray>()
951 .expect("Expected timestamp_utc to be a TimestampMillisecondArray");
952 assert_eq!(timestamp_array.value(0), 1640995200000);
953 assert_eq!(timestamp_array.value(1), 1641081600000);
954 Ok(())
955 }
956
957 fn verify_reason_field_structure(info_array: &StructArray) -> Result<()> {
958 let reason_col = info_array
959 .column_by_name("reason")
960 .expect("Expected reason field in struct");
961 let reason_array = reason_col
962 .as_any()
963 .downcast_ref::<StructArray>()
964 .expect("Expected reason to be a StructArray");
965 assert_eq!(reason_array.fields().len(), 2);
966 assert!(reason_array.column_by_name("_level").is_some());
967 assert!(reason_array.column_by_name("details").is_some());
968
969 let details_col = reason_array
970 .column_by_name("details")
971 .expect("Expected details field in reason struct");
972 let details_array = details_col
973 .as_any()
974 .downcast_ref::<StructArray>()
975 .expect("Expected details to be a StructArray");
976 assert_eq!(details_array.fields().len(), 3);
977 assert!(details_array.column_by_name("rurl").is_some());
978 assert!(details_array.column_by_name("s").is_some());
979 assert!(details_array.column_by_name("t").is_some());
980 for i in 0..2 {
981 assert!(reason_array.is_null(i), "reason field should be null");
982 }
983 Ok(())
984 }
985
986 fn verify_column_statistics(
987 stats: &ColumnStatistics,
988 expected_null_count: Option<usize>,
989 expected_distinct_count: Option<usize>,
990 expected_min: Option<ScalarValue>,
991 expected_max: Option<ScalarValue>,
992 expected_sum: Option<ScalarValue>,
993 ) {
994 if let Some(count) = expected_null_count {
995 assert_eq!(
996 stats.null_count,
997 Precision::Exact(count),
998 "Null count should match expected value"
999 );
1000 }
1001 if let Some(count) = expected_distinct_count {
1002 assert_eq!(
1003 stats.distinct_count,
1004 Precision::Exact(count),
1005 "Distinct count should match expected value"
1006 );
1007 }
1008 if let Some(min) = expected_min {
1009 assert_eq!(
1010 stats.min_value,
1011 Precision::Exact(min),
1012 "Min value should match expected value"
1013 );
1014 }
1015 if let Some(max) = expected_max {
1016 assert_eq!(
1017 stats.max_value,
1018 Precision::Exact(max),
1019 "Max value should match expected value"
1020 );
1021 }
1022 if let Some(sum) = expected_sum {
1023 assert_eq!(
1024 stats.sum_value,
1025 Precision::Exact(sum),
1026 "Sum value should match expected value"
1027 );
1028 }
1029 }
1030
1031 fn create_test_column_statistics(
1032 null_count: usize,
1033 distinct_count: usize,
1034 min_value: Option<ScalarValue>,
1035 max_value: Option<ScalarValue>,
1036 sum_value: Option<ScalarValue>,
1037 ) -> ColumnStatistics {
1038 ColumnStatistics {
1039 null_count: Precision::Exact(null_count),
1040 distinct_count: Precision::Exact(distinct_count),
1041 min_value: min_value.map_or_else(|| Precision::Absent, Precision::Exact),
1042 max_value: max_value.map_or_else(|| Precision::Absent, Precision::Exact),
1043 sum_value: sum_value.map_or_else(|| Precision::Absent, Precision::Exact),
1044 }
1045 }
1046}