1use arrow::{
24 array::{new_null_array, ArrayRef, RecordBatch, RecordBatchOptions},
25 compute::can_cast_types,
26 datatypes::{DataType, Field, Schema, SchemaRef},
27};
28use datafusion_common::{
29 nested_struct::{cast_column, validate_struct_compatibility},
30 plan_err, ColumnStatistics,
31};
32use std::{fmt::Debug, sync::Arc};
33pub type CastColumnFn =
36 dyn Fn(&ArrayRef, &Field) -> datafusion_common::Result<ArrayRef> + Send + Sync;
37
38pub trait SchemaAdapterFactory: Debug + Send + Sync + 'static {
47 fn create(
56 &self,
57 projected_table_schema: SchemaRef,
58 table_schema: SchemaRef,
59 ) -> Box<dyn SchemaAdapter>;
60
61 fn create_with_projected_schema(
66 &self,
67 projected_table_schema: SchemaRef,
68 ) -> Box<dyn SchemaAdapter> {
69 self.create(Arc::clone(&projected_table_schema), projected_table_schema)
70 }
71}
72
73pub trait SchemaAdapter: Send + Sync {
81 fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize>;
89
90 fn map_schema(
106 &self,
107 file_schema: &Schema,
108 ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)>;
109}
110
111pub trait SchemaMapper: Debug + Send + Sync {
115 fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch>;
117
118 fn map_column_statistics(
120 &self,
121 file_col_statistics: &[ColumnStatistics],
122 ) -> datafusion_common::Result<Vec<ColumnStatistics>>;
123}
124
125#[derive(Clone, Debug, Default)]
211pub struct DefaultSchemaAdapterFactory;
212
213impl DefaultSchemaAdapterFactory {
214 pub fn from_schema(table_schema: SchemaRef) -> Box<dyn SchemaAdapter> {
221 Self.create(Arc::clone(&table_schema), table_schema)
222 }
223}
224
225impl SchemaAdapterFactory for DefaultSchemaAdapterFactory {
226 fn create(
227 &self,
228 projected_table_schema: SchemaRef,
229 _table_schema: SchemaRef,
230 ) -> Box<dyn SchemaAdapter> {
231 Box::new(DefaultSchemaAdapter {
232 projected_table_schema,
233 })
234 }
235}
236
237#[derive(Clone, Debug)]
240pub(crate) struct DefaultSchemaAdapter {
241 projected_table_schema: SchemaRef,
244}
245
246pub(crate) fn can_cast_field(
250 file_field: &Field,
251 table_field: &Field,
252) -> datafusion_common::Result<bool> {
253 match (file_field.data_type(), table_field.data_type()) {
254 (DataType::Struct(source_fields), DataType::Struct(target_fields)) => {
255 validate_struct_compatibility(source_fields, target_fields)
256 }
257 _ => {
258 if can_cast_types(file_field.data_type(), table_field.data_type()) {
259 Ok(true)
260 } else {
261 plan_err!(
262 "Cannot cast file schema field {} of type {:?} to table schema field of type {:?}",
263 file_field.name(),
264 file_field.data_type(),
265 table_field.data_type()
266 )
267 }
268 }
269 }
270}
271
272impl SchemaAdapter for DefaultSchemaAdapter {
273 fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize> {
278 let field = self.projected_table_schema.field(index);
279 Some(file_schema.fields.find(field.name())?.0)
280 }
281
282 fn map_schema(
292 &self,
293 file_schema: &Schema,
294 ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
295 let (field_mappings, projection) = create_field_mapping(
296 file_schema,
297 &self.projected_table_schema,
298 can_cast_field,
299 )?;
300
301 Ok((
302 Arc::new(SchemaMapping::new(
303 Arc::clone(&self.projected_table_schema),
304 field_mappings,
305 Arc::new(|array: &ArrayRef, field: &Field| cast_column(array, field)),
306 )),
307 projection,
308 ))
309 }
310}
311
312pub(crate) fn create_field_mapping<F>(
319 file_schema: &Schema,
320 projected_table_schema: &SchemaRef,
321 can_map_field: F,
322) -> datafusion_common::Result<(Vec<Option<usize>>, Vec<usize>)>
323where
324 F: Fn(&Field, &Field) -> datafusion_common::Result<bool>,
325{
326 let mut projection = Vec::with_capacity(file_schema.fields().len());
327 let mut field_mappings = vec![None; projected_table_schema.fields().len()];
328
329 for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
330 if let Some((table_idx, table_field)) =
331 projected_table_schema.fields().find(file_field.name())
332 {
333 if can_map_field(file_field, table_field)? {
334 field_mappings[table_idx] = Some(projection.len());
335 projection.push(file_idx);
336 }
337 }
338 }
339
340 Ok((field_mappings, projection))
341}
342
343pub struct SchemaMapping {
353 projected_table_schema: SchemaRef,
356 field_mappings: Vec<Option<usize>>,
362 cast_column: Arc<CastColumnFn>,
365}
366
367impl Debug for SchemaMapping {
368 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
369 f.debug_struct("SchemaMapping")
370 .field("projected_table_schema", &self.projected_table_schema)
371 .field("field_mappings", &self.field_mappings)
372 .field("cast_column", &"<fn>")
373 .finish()
374 }
375}
376
377impl SchemaMapping {
378 pub fn new(
382 projected_table_schema: SchemaRef,
383 field_mappings: Vec<Option<usize>>,
384 cast_column: Arc<CastColumnFn>,
385 ) -> Self {
386 Self {
387 projected_table_schema,
388 field_mappings,
389 cast_column,
390 }
391 }
392}
393
394impl SchemaMapper for SchemaMapping {
395 fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch> {
399 let (_old_schema, batch_cols, batch_rows) = batch.into_parts();
400
401 let cols = self
402 .projected_table_schema
403 .fields()
405 .iter()
406 .zip(&self.field_mappings)
409 .map(|(field, file_idx)| {
411 file_idx.map_or_else(
412 || Ok(new_null_array(field.data_type(), batch_rows)),
415 |batch_idx| (self.cast_column)(&batch_cols[batch_idx], field),
418 )
419 })
420 .collect::<datafusion_common::Result<Vec<_>, _>>()?;
421
422 let options = RecordBatchOptions::new().with_row_count(Some(batch_rows));
424
425 let schema = Arc::clone(&self.projected_table_schema);
426 let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?;
427 Ok(record_batch)
428 }
429
430 fn map_column_statistics(
432 &self,
433 file_col_statistics: &[ColumnStatistics],
434 ) -> datafusion_common::Result<Vec<ColumnStatistics>> {
435 let mut table_col_statistics = vec![];
436
437 for (_, file_col_idx) in self
440 .projected_table_schema
441 .fields()
442 .iter()
443 .zip(&self.field_mappings)
444 {
445 if let Some(file_col_idx) = file_col_idx {
446 table_col_statistics.push(
447 file_col_statistics
448 .get(*file_col_idx)
449 .cloned()
450 .unwrap_or_default(),
451 );
452 } else {
453 table_col_statistics.push(ColumnStatistics::new_unknown());
454 }
455 }
456
457 Ok(table_col_statistics)
458 }
459}
460
461#[cfg(test)]
462mod tests {
463 use super::*;
464 use arrow::{
465 array::{Array, ArrayRef, StringBuilder, StructArray, TimestampMillisecondArray},
466 compute::cast,
467 datatypes::{DataType, Field, TimeUnit},
468 record_batch::RecordBatch,
469 };
470 use datafusion_common::{stats::Precision, Result, ScalarValue, Statistics};
471
472 #[test]
473 fn test_schema_mapping_map_statistics_basic() {
474 let table_schema = Arc::new(Schema::new(vec![
476 Field::new("a", DataType::Int32, true),
477 Field::new("b", DataType::Utf8, true),
478 Field::new("c", DataType::Float64, true),
479 ]));
480
481 let file_schema = Schema::new(vec![
483 Field::new("b", DataType::Utf8, true),
484 Field::new("a", DataType::Int32, true),
485 ]);
486
487 let adapter = DefaultSchemaAdapter {
489 projected_table_schema: Arc::clone(&table_schema),
490 };
491
492 let (mapper, projection) = adapter.map_schema(&file_schema).unwrap();
494
495 assert_eq!(projection, vec![0, 1]);
497
498 let mut file_stats = Statistics::default();
500
501 let b_stats = ColumnStatistics {
503 null_count: Precision::Exact(5),
504 ..Default::default()
505 };
506
507 let a_stats = ColumnStatistics {
509 null_count: Precision::Exact(10),
510 ..Default::default()
511 };
512
513 file_stats.column_statistics = vec![b_stats, a_stats];
514
515 let table_col_stats = mapper
517 .map_column_statistics(&file_stats.column_statistics)
518 .unwrap();
519
520 assert_eq!(table_col_stats.len(), 3);
522 assert_eq!(table_col_stats[0].null_count, Precision::Exact(10)); assert_eq!(table_col_stats[1].null_count, Precision::Exact(5)); assert_eq!(table_col_stats[2].null_count, Precision::Absent); }
526
527 #[test]
528 fn test_schema_mapping_map_statistics_empty() {
529 let table_schema = Arc::new(Schema::new(vec![
531 Field::new("a", DataType::Int32, true),
532 Field::new("b", DataType::Utf8, true),
533 ]));
534 let file_schema = Schema::new(vec![
535 Field::new("a", DataType::Int32, true),
536 Field::new("b", DataType::Utf8, true),
537 ]);
538
539 let adapter = DefaultSchemaAdapter {
540 projected_table_schema: Arc::clone(&table_schema),
541 };
542 let (mapper, _) = adapter.map_schema(&file_schema).unwrap();
543
544 let file_stats = Statistics::default();
546 let table_col_stats = mapper
547 .map_column_statistics(&file_stats.column_statistics)
548 .unwrap();
549
550 assert_eq!(table_col_stats.len(), 2);
552 assert_eq!(table_col_stats[0], ColumnStatistics::new_unknown(),);
553 assert_eq!(table_col_stats[1], ColumnStatistics::new_unknown(),);
554 }
555
556 #[test]
557 fn test_can_cast_field() {
558 let from_field = Field::new("col", DataType::Int32, true);
560 let to_field = Field::new("col", DataType::Int32, true);
561 assert!(can_cast_field(&from_field, &to_field).unwrap());
562
563 let from_field = Field::new("col", DataType::Int32, true);
565 let to_field = Field::new("col", DataType::Float64, true);
566 assert!(can_cast_field(&from_field, &to_field).unwrap());
567
568 let from_field = Field::new("col", DataType::Float64, true);
570 let to_field = Field::new("col", DataType::Utf8, true);
571 assert!(can_cast_field(&from_field, &to_field).unwrap());
572
573 let from_field = Field::new("col", DataType::Binary, true);
576 let to_field = Field::new("col", DataType::Decimal128(10, 2), true);
577 let result = can_cast_field(&from_field, &to_field);
578 assert!(result.is_err());
579 let error_msg = result.unwrap_err().to_string();
580 assert!(error_msg.contains("Cannot cast file schema field col"));
581 }
582
583 #[test]
584 fn test_create_field_mapping() {
585 let table_schema = Arc::new(Schema::new(vec![
587 Field::new("a", DataType::Int32, true),
588 Field::new("b", DataType::Utf8, true),
589 Field::new("c", DataType::Float64, true),
590 ]));
591
592 let file_schema = Schema::new(vec![
594 Field::new("b", DataType::Float64, true), Field::new("a", DataType::Int32, true), Field::new("d", DataType::Boolean, true), ]);
598
599 let allow_all = |_: &Field, _: &Field| Ok(true);
601
602 let (field_mappings, projection) =
604 create_field_mapping(&file_schema, &table_schema, allow_all).unwrap();
605
606 assert_eq!(field_mappings, vec![Some(1), Some(0), None]);
611 assert_eq!(projection, vec![0, 1]); let fails_all = |_: &Field, _: &Field| Ok(false);
615 let (field_mappings, projection) =
616 create_field_mapping(&file_schema, &table_schema, fails_all).unwrap();
617
618 assert_eq!(field_mappings, vec![None, None, None]);
620 assert_eq!(projection, Vec::<usize>::new());
621
622 let error_mapper = |_: &Field, _: &Field| plan_err!("Test error");
624 let result = create_field_mapping(&file_schema, &table_schema, error_mapper);
625 assert!(result.is_err());
626 assert!(result.unwrap_err().to_string().contains("Test error"));
627 }
628
629 #[test]
630 fn test_schema_mapping_new() {
631 let projected_schema = Arc::new(Schema::new(vec![
633 Field::new("a", DataType::Int32, true),
634 Field::new("b", DataType::Utf8, true),
635 ]));
636
637 let field_mappings = vec![Some(1), Some(0)];
639
640 let mapping = SchemaMapping::new(
642 Arc::clone(&projected_schema),
643 field_mappings.clone(),
644 Arc::new(|array: &ArrayRef, field: &Field| cast_column(array, field)),
645 );
646
647 assert_eq!(*mapping.projected_table_schema, *projected_schema);
649 assert_eq!(mapping.field_mappings, field_mappings);
650
651 let batch = RecordBatch::try_new(
653 Arc::new(Schema::new(vec![
654 Field::new("b_file", DataType::Utf8, true),
655 Field::new("a_file", DataType::Int32, true),
656 ])),
657 vec![
658 Arc::new(arrow::array::StringArray::from(vec!["hello", "world"])),
659 Arc::new(arrow::array::Int32Array::from(vec![1, 2])),
660 ],
661 )
662 .unwrap();
663
664 let mapped_batch = mapping.map_batch(batch).unwrap();
666
667 assert_eq!(*mapped_batch.schema(), *projected_schema);
669 assert_eq!(mapped_batch.num_columns(), 2);
670 assert_eq!(mapped_batch.column(0).len(), 2); assert_eq!(mapped_batch.column(1).len(), 2); }
673
674 #[test]
675 fn test_map_schema_error_path() {
676 let table_schema = Arc::new(Schema::new(vec![
678 Field::new("a", DataType::Int32, true),
679 Field::new("b", DataType::Utf8, true),
680 Field::new("c", DataType::Decimal128(10, 2), true), ]));
682
683 let file_schema = Schema::new(vec![
685 Field::new("a", DataType::Int32, true),
686 Field::new("b", DataType::Float64, true), Field::new("c", DataType::Binary, true), ]);
689
690 let adapter = DefaultSchemaAdapter {
692 projected_table_schema: Arc::clone(&table_schema),
693 };
694
695 let result = adapter.map_schema(&file_schema);
697 assert!(result.is_err());
698 let error_msg = result.unwrap_err().to_string();
699 assert!(error_msg.contains("Cannot cast file schema field c"));
700 }
701
702 #[test]
703 fn test_map_schema_happy_path() {
704 let table_schema = Arc::new(Schema::new(vec![
706 Field::new("a", DataType::Int32, true),
707 Field::new("b", DataType::Utf8, true),
708 Field::new("c", DataType::Decimal128(10, 2), true),
709 ]));
710
711 let adapter = DefaultSchemaAdapter {
713 projected_table_schema: Arc::clone(&table_schema),
714 };
715
716 let compatible_file_schema = Schema::new(vec![
718 Field::new("a", DataType::Int64, true), Field::new("b", DataType::Float64, true), ]);
721
722 let (mapper, projection) = adapter.map_schema(&compatible_file_schema).unwrap();
724
725 assert_eq!(projection, vec![0, 1]); let file_batch = RecordBatch::try_new(
730 Arc::new(compatible_file_schema.clone()),
731 vec![
732 Arc::new(arrow::array::Int64Array::from(vec![100, 200])),
733 Arc::new(arrow::array::Float64Array::from(vec![1.5, 2.5])),
734 ],
735 )
736 .unwrap();
737
738 let mapped_batch = mapper.map_batch(file_batch).unwrap();
739
740 assert_eq!(*mapped_batch.schema(), *table_schema);
742 assert_eq!(mapped_batch.num_columns(), 3); let c_array = mapped_batch.column(2);
746 assert_eq!(c_array.len(), 2);
747 assert_eq!(c_array.null_count(), 2);
748 }
749
750 #[test]
751 fn test_adapt_struct_with_added_nested_fields() -> Result<()> {
752 let (file_schema, table_schema) = create_test_schemas_with_nested_fields();
753 let batch = create_test_batch_with_struct_data(&file_schema)?;
754
755 let adapter = DefaultSchemaAdapter {
756 projected_table_schema: Arc::clone(&table_schema),
757 };
758 let (mapper, _) = adapter.map_schema(file_schema.as_ref())?;
759 let mapped_batch = mapper.map_batch(batch)?;
760
761 verify_adapted_batch_with_nested_fields(&mapped_batch, &table_schema)?;
762 Ok(())
763 }
764
765 #[test]
766 fn test_map_column_statistics_struct() -> Result<()> {
767 let (file_schema, table_schema) = create_test_schemas_with_nested_fields();
768
769 let adapter = DefaultSchemaAdapter {
770 projected_table_schema: Arc::clone(&table_schema),
771 };
772 let (mapper, _) = adapter.map_schema(file_schema.as_ref())?;
773
774 let file_stats = vec![
775 create_test_column_statistics(
776 0,
777 100,
778 Some(ScalarValue::Int32(Some(1))),
779 Some(ScalarValue::Int32(Some(100))),
780 Some(ScalarValue::Int32(Some(5100))),
781 ),
782 create_test_column_statistics(10, 50, None, None, None),
783 ];
784
785 let table_stats = mapper.map_column_statistics(&file_stats)?;
786 assert_eq!(table_stats.len(), 1);
787 verify_column_statistics(
788 &table_stats[0],
789 Some(0),
790 Some(100),
791 Some(ScalarValue::Int32(Some(1))),
792 Some(ScalarValue::Int32(Some(100))),
793 Some(ScalarValue::Int32(Some(5100))),
794 );
795 let missing_stats = mapper.map_column_statistics(&[])?;
796 assert_eq!(missing_stats.len(), 1);
797 assert_eq!(missing_stats[0], ColumnStatistics::new_unknown());
798 Ok(())
799 }
800
801 fn create_test_schemas_with_nested_fields() -> (SchemaRef, SchemaRef) {
802 let file_schema = Arc::new(Schema::new(vec![Field::new(
803 "info",
804 DataType::Struct(
805 vec![
806 Field::new("location", DataType::Utf8, true),
807 Field::new(
808 "timestamp_utc",
809 DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
810 true,
811 ),
812 ]
813 .into(),
814 ),
815 true,
816 )]));
817
818 let table_schema = Arc::new(Schema::new(vec![Field::new(
819 "info",
820 DataType::Struct(
821 vec![
822 Field::new("location", DataType::Utf8, true),
823 Field::new(
824 "timestamp_utc",
825 DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
826 true,
827 ),
828 Field::new(
829 "reason",
830 DataType::Struct(
831 vec![
832 Field::new("_level", DataType::Float64, true),
833 Field::new(
834 "details",
835 DataType::Struct(
836 vec![
837 Field::new("rurl", DataType::Utf8, true),
838 Field::new("s", DataType::Float64, true),
839 Field::new("t", DataType::Utf8, true),
840 ]
841 .into(),
842 ),
843 true,
844 ),
845 ]
846 .into(),
847 ),
848 true,
849 ),
850 ]
851 .into(),
852 ),
853 true,
854 )]));
855
856 (file_schema, table_schema)
857 }
858
859 fn create_test_batch_with_struct_data(
860 file_schema: &SchemaRef,
861 ) -> Result<RecordBatch> {
862 let mut location_builder = StringBuilder::new();
863 location_builder.append_value("San Francisco");
864 location_builder.append_value("New York");
865
866 let timestamp_array = TimestampMillisecondArray::from(vec![
867 Some(1640995200000),
868 Some(1641081600000),
869 ]);
870
871 let timestamp_type =
872 DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into()));
873 let timestamp_array = cast(×tamp_array, ×tamp_type)?;
874
875 let info_struct = StructArray::from(vec![
876 (
877 Arc::new(Field::new("location", DataType::Utf8, true)),
878 Arc::new(location_builder.finish()) as ArrayRef,
879 ),
880 (
881 Arc::new(Field::new("timestamp_utc", timestamp_type, true)),
882 timestamp_array,
883 ),
884 ]);
885
886 Ok(RecordBatch::try_new(
887 Arc::clone(file_schema),
888 vec![Arc::new(info_struct)],
889 )?)
890 }
891
892 fn verify_adapted_batch_with_nested_fields(
893 mapped_batch: &RecordBatch,
894 table_schema: &SchemaRef,
895 ) -> Result<()> {
896 assert_eq!(mapped_batch.schema(), *table_schema);
897 assert_eq!(mapped_batch.num_rows(), 2);
898
899 let info_col = mapped_batch.column(0);
900 let info_array = info_col
901 .as_any()
902 .downcast_ref::<StructArray>()
903 .expect("Expected info column to be a StructArray");
904
905 verify_preserved_fields(info_array)?;
906 verify_reason_field_structure(info_array)?;
907 Ok(())
908 }
909
910 fn verify_preserved_fields(info_array: &StructArray) -> Result<()> {
911 let location_col = info_array
912 .column_by_name("location")
913 .expect("Expected location field in struct");
914 let location_array = location_col
915 .as_any()
916 .downcast_ref::<arrow::array::StringArray>()
917 .expect("Expected location to be a StringArray");
918 assert_eq!(location_array.value(0), "San Francisco");
919 assert_eq!(location_array.value(1), "New York");
920
921 let timestamp_col = info_array
922 .column_by_name("timestamp_utc")
923 .expect("Expected timestamp_utc field in struct");
924 let timestamp_array = timestamp_col
925 .as_any()
926 .downcast_ref::<TimestampMillisecondArray>()
927 .expect("Expected timestamp_utc to be a TimestampMillisecondArray");
928 assert_eq!(timestamp_array.value(0), 1640995200000);
929 assert_eq!(timestamp_array.value(1), 1641081600000);
930 Ok(())
931 }
932
933 fn verify_reason_field_structure(info_array: &StructArray) -> Result<()> {
934 let reason_col = info_array
935 .column_by_name("reason")
936 .expect("Expected reason field in struct");
937 let reason_array = reason_col
938 .as_any()
939 .downcast_ref::<StructArray>()
940 .expect("Expected reason to be a StructArray");
941 assert_eq!(reason_array.fields().len(), 2);
942 assert!(reason_array.column_by_name("_level").is_some());
943 assert!(reason_array.column_by_name("details").is_some());
944
945 let details_col = reason_array
946 .column_by_name("details")
947 .expect("Expected details field in reason struct");
948 let details_array = details_col
949 .as_any()
950 .downcast_ref::<StructArray>()
951 .expect("Expected details to be a StructArray");
952 assert_eq!(details_array.fields().len(), 3);
953 assert!(details_array.column_by_name("rurl").is_some());
954 assert!(details_array.column_by_name("s").is_some());
955 assert!(details_array.column_by_name("t").is_some());
956 for i in 0..2 {
957 assert!(reason_array.is_null(i), "reason field should be null");
958 }
959 Ok(())
960 }
961
962 fn verify_column_statistics(
963 stats: &ColumnStatistics,
964 expected_null_count: Option<usize>,
965 expected_distinct_count: Option<usize>,
966 expected_min: Option<ScalarValue>,
967 expected_max: Option<ScalarValue>,
968 expected_sum: Option<ScalarValue>,
969 ) {
970 if let Some(count) = expected_null_count {
971 assert_eq!(
972 stats.null_count,
973 Precision::Exact(count),
974 "Null count should match expected value"
975 );
976 }
977 if let Some(count) = expected_distinct_count {
978 assert_eq!(
979 stats.distinct_count,
980 Precision::Exact(count),
981 "Distinct count should match expected value"
982 );
983 }
984 if let Some(min) = expected_min {
985 assert_eq!(
986 stats.min_value,
987 Precision::Exact(min),
988 "Min value should match expected value"
989 );
990 }
991 if let Some(max) = expected_max {
992 assert_eq!(
993 stats.max_value,
994 Precision::Exact(max),
995 "Max value should match expected value"
996 );
997 }
998 if let Some(sum) = expected_sum {
999 assert_eq!(
1000 stats.sum_value,
1001 Precision::Exact(sum),
1002 "Sum value should match expected value"
1003 );
1004 }
1005 }
1006
1007 fn create_test_column_statistics(
1008 null_count: usize,
1009 distinct_count: usize,
1010 min_value: Option<ScalarValue>,
1011 max_value: Option<ScalarValue>,
1012 sum_value: Option<ScalarValue>,
1013 ) -> ColumnStatistics {
1014 ColumnStatistics {
1015 null_count: Precision::Exact(null_count),
1016 distinct_count: Precision::Exact(distinct_count),
1017 min_value: min_value.map_or_else(|| Precision::Absent, Precision::Exact),
1018 max_value: max_value.map_or_else(|| Precision::Absent, Precision::Exact),
1019 sum_value: sum_value.map_or_else(|| Precision::Absent, Precision::Exact),
1020 }
1021 }
1022}