1use arrow::{
24 array::{new_null_array, ArrayRef, RecordBatch, RecordBatchOptions},
25 compute::can_cast_types,
26 datatypes::{DataType, Field, Schema, SchemaRef},
27};
28use datafusion_common::{
29 nested_struct::{cast_column, validate_struct_compatibility},
30 plan_err, ColumnStatistics,
31};
32use std::{fmt::Debug, sync::Arc};
33pub type CastColumnFn =
36 dyn Fn(&ArrayRef, &Field) -> datafusion_common::Result<ArrayRef> + Send + Sync;
37
38pub trait SchemaAdapterFactory: Debug + Send + Sync + 'static {
47 fn create(
56 &self,
57 projected_table_schema: SchemaRef,
58 table_schema: SchemaRef,
59 ) -> Box<dyn SchemaAdapter>;
60
61 fn create_with_projected_schema(
66 &self,
67 projected_table_schema: SchemaRef,
68 ) -> Box<dyn SchemaAdapter> {
69 self.create(Arc::clone(&projected_table_schema), projected_table_schema)
70 }
71}
72
73pub trait SchemaAdapter: Send + Sync {
81 fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize>;
89
90 fn map_schema(
106 &self,
107 file_schema: &Schema,
108 ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)>;
109}
110
111pub trait SchemaMapper: Debug + Send + Sync {
115 fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch>;
117
118 fn map_column_statistics(
120 &self,
121 file_col_statistics: &[ColumnStatistics],
122 ) -> datafusion_common::Result<Vec<ColumnStatistics>>;
123}
124
125#[derive(Clone, Debug, Default)]
211pub struct DefaultSchemaAdapterFactory;
212
213impl DefaultSchemaAdapterFactory {
214 pub fn from_schema(table_schema: SchemaRef) -> Box<dyn SchemaAdapter> {
221 Self.create(Arc::clone(&table_schema), table_schema)
222 }
223}
224
225impl SchemaAdapterFactory for DefaultSchemaAdapterFactory {
226 fn create(
227 &self,
228 projected_table_schema: SchemaRef,
229 _table_schema: SchemaRef,
230 ) -> Box<dyn SchemaAdapter> {
231 Box::new(DefaultSchemaAdapter {
232 projected_table_schema,
233 })
234 }
235}
236
237#[derive(Clone, Debug)]
240pub(crate) struct DefaultSchemaAdapter {
241 projected_table_schema: SchemaRef,
244}
245
246pub(crate) fn can_cast_field(
250 file_field: &Field,
251 table_field: &Field,
252) -> datafusion_common::Result<bool> {
253 match (file_field.data_type(), table_field.data_type()) {
254 (DataType::Struct(source_fields), DataType::Struct(target_fields)) => {
255 validate_struct_compatibility(source_fields, target_fields)
256 }
257 _ => {
258 if can_cast_types(file_field.data_type(), table_field.data_type()) {
259 Ok(true)
260 } else {
261 plan_err!(
262 "Cannot cast file schema field {} of type {:?} to table schema field of type {:?}",
263 file_field.name(),
264 file_field.data_type(),
265 table_field.data_type()
266 )
267 }
268 }
269 }
270}
271
272impl SchemaAdapter for DefaultSchemaAdapter {
273 fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize> {
278 let field = self.projected_table_schema.field(index);
279 Some(file_schema.fields.find(field.name())?.0)
280 }
281
282 fn map_schema(
292 &self,
293 file_schema: &Schema,
294 ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
295 let (field_mappings, projection) = create_field_mapping(
296 file_schema,
297 &self.projected_table_schema,
298 can_cast_field,
299 )?;
300
301 Ok((
302 Arc::new(SchemaMapping::new(
303 Arc::clone(&self.projected_table_schema),
304 field_mappings,
305 Arc::new(|array: &ArrayRef, field: &Field| cast_column(array, field)),
306 )),
307 projection,
308 ))
309 }
310}
311
312pub(crate) fn create_field_mapping<F>(
319 file_schema: &Schema,
320 projected_table_schema: &SchemaRef,
321 can_map_field: F,
322) -> datafusion_common::Result<(Vec<Option<usize>>, Vec<usize>)>
323where
324 F: Fn(&Field, &Field) -> datafusion_common::Result<bool>,
325{
326 let mut projection = Vec::with_capacity(file_schema.fields().len());
327 let mut field_mappings = vec![None; projected_table_schema.fields().len()];
328
329 for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
330 if let Some((table_idx, table_field)) =
331 projected_table_schema.fields().find(file_field.name())
332 {
333 if can_map_field(file_field, table_field)? {
334 field_mappings[table_idx] = Some(projection.len());
335 projection.push(file_idx);
336 }
337 }
338 }
339
340 Ok((field_mappings, projection))
341}
342
343pub struct SchemaMapping {
353 projected_table_schema: SchemaRef,
356 field_mappings: Vec<Option<usize>>,
362 cast_column: Arc<CastColumnFn>,
365}
366
367impl Debug for SchemaMapping {
368 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
369 f.debug_struct("SchemaMapping")
370 .field("projected_table_schema", &self.projected_table_schema)
371 .field("field_mappings", &self.field_mappings)
372 .field("cast_column", &"<fn>")
373 .finish()
374 }
375}
376
377impl SchemaMapping {
378 pub fn new(
382 projected_table_schema: SchemaRef,
383 field_mappings: Vec<Option<usize>>,
384 cast_column: Arc<CastColumnFn>,
385 ) -> Self {
386 Self {
387 projected_table_schema,
388 field_mappings,
389 cast_column,
390 }
391 }
392}
393
394impl SchemaMapper for SchemaMapping {
395 fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch> {
399 let batch_rows = batch.num_rows();
400 let batch_cols = batch.columns().to_vec();
401
402 let cols = self
403 .projected_table_schema
404 .fields()
406 .iter()
407 .zip(&self.field_mappings)
410 .map(|(field, file_idx)| {
412 file_idx.map_or_else(
413 || Ok(new_null_array(field.data_type(), batch_rows)),
416 |batch_idx| (self.cast_column)(&batch_cols[batch_idx], field),
419 )
420 })
421 .collect::<datafusion_common::Result<Vec<_>, _>>()?;
422
423 let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
425
426 let schema = Arc::clone(&self.projected_table_schema);
427 let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?;
428 Ok(record_batch)
429 }
430
431 fn map_column_statistics(
433 &self,
434 file_col_statistics: &[ColumnStatistics],
435 ) -> datafusion_common::Result<Vec<ColumnStatistics>> {
436 let mut table_col_statistics = vec![];
437
438 for (_, file_col_idx) in self
441 .projected_table_schema
442 .fields()
443 .iter()
444 .zip(&self.field_mappings)
445 {
446 if let Some(file_col_idx) = file_col_idx {
447 table_col_statistics.push(
448 file_col_statistics
449 .get(*file_col_idx)
450 .cloned()
451 .unwrap_or_default(),
452 );
453 } else {
454 table_col_statistics.push(ColumnStatistics::new_unknown());
455 }
456 }
457
458 Ok(table_col_statistics)
459 }
460}
461
462#[cfg(test)]
463mod tests {
464 use super::*;
465 use arrow::{
466 array::{Array, ArrayRef, StringBuilder, StructArray, TimestampMillisecondArray},
467 compute::cast,
468 datatypes::{DataType, Field, TimeUnit},
469 record_batch::RecordBatch,
470 };
471 use datafusion_common::{stats::Precision, Result, ScalarValue, Statistics};
472
473 #[test]
474 fn test_schema_mapping_map_statistics_basic() {
475 let table_schema = Arc::new(Schema::new(vec![
477 Field::new("a", DataType::Int32, true),
478 Field::new("b", DataType::Utf8, true),
479 Field::new("c", DataType::Float64, true),
480 ]));
481
482 let file_schema = Schema::new(vec![
484 Field::new("b", DataType::Utf8, true),
485 Field::new("a", DataType::Int32, true),
486 ]);
487
488 let adapter = DefaultSchemaAdapter {
490 projected_table_schema: Arc::clone(&table_schema),
491 };
492
493 let (mapper, projection) = adapter.map_schema(&file_schema).unwrap();
495
496 assert_eq!(projection, vec![0, 1]);
498
499 let mut file_stats = Statistics::default();
501
502 let b_stats = ColumnStatistics {
504 null_count: Precision::Exact(5),
505 ..Default::default()
506 };
507
508 let a_stats = ColumnStatistics {
510 null_count: Precision::Exact(10),
511 ..Default::default()
512 };
513
514 file_stats.column_statistics = vec![b_stats, a_stats];
515
516 let table_col_stats = mapper
518 .map_column_statistics(&file_stats.column_statistics)
519 .unwrap();
520
521 assert_eq!(table_col_stats.len(), 3);
523 assert_eq!(table_col_stats[0].null_count, Precision::Exact(10)); assert_eq!(table_col_stats[1].null_count, Precision::Exact(5)); assert_eq!(table_col_stats[2].null_count, Precision::Absent); }
527
528 #[test]
529 fn test_schema_mapping_map_statistics_empty() {
530 let table_schema = Arc::new(Schema::new(vec![
532 Field::new("a", DataType::Int32, true),
533 Field::new("b", DataType::Utf8, true),
534 ]));
535 let file_schema = Schema::new(vec![
536 Field::new("a", DataType::Int32, true),
537 Field::new("b", DataType::Utf8, true),
538 ]);
539
540 let adapter = DefaultSchemaAdapter {
541 projected_table_schema: Arc::clone(&table_schema),
542 };
543 let (mapper, _) = adapter.map_schema(&file_schema).unwrap();
544
545 let file_stats = Statistics::default();
547 let table_col_stats = mapper
548 .map_column_statistics(&file_stats.column_statistics)
549 .unwrap();
550
551 assert_eq!(table_col_stats.len(), 2);
553 assert_eq!(table_col_stats[0], ColumnStatistics::new_unknown(),);
554 assert_eq!(table_col_stats[1], ColumnStatistics::new_unknown(),);
555 }
556
557 #[test]
558 fn test_can_cast_field() {
559 let from_field = Field::new("col", DataType::Int32, true);
561 let to_field = Field::new("col", DataType::Int32, true);
562 assert!(can_cast_field(&from_field, &to_field).unwrap());
563
564 let from_field = Field::new("col", DataType::Int32, true);
566 let to_field = Field::new("col", DataType::Float64, true);
567 assert!(can_cast_field(&from_field, &to_field).unwrap());
568
569 let from_field = Field::new("col", DataType::Float64, true);
571 let to_field = Field::new("col", DataType::Utf8, true);
572 assert!(can_cast_field(&from_field, &to_field).unwrap());
573
574 let from_field = Field::new("col", DataType::Binary, true);
577 let to_field = Field::new("col", DataType::Decimal128(10, 2), true);
578 let result = can_cast_field(&from_field, &to_field);
579 assert!(result.is_err());
580 let error_msg = result.unwrap_err().to_string();
581 assert!(error_msg.contains("Cannot cast file schema field col"));
582 }
583
584 #[test]
585 fn test_create_field_mapping() {
586 let table_schema = Arc::new(Schema::new(vec![
588 Field::new("a", DataType::Int32, true),
589 Field::new("b", DataType::Utf8, true),
590 Field::new("c", DataType::Float64, true),
591 ]));
592
593 let file_schema = Schema::new(vec![
595 Field::new("b", DataType::Float64, true), Field::new("a", DataType::Int32, true), Field::new("d", DataType::Boolean, true), ]);
599
600 let allow_all = |_: &Field, _: &Field| Ok(true);
602
603 let (field_mappings, projection) =
605 create_field_mapping(&file_schema, &table_schema, allow_all).unwrap();
606
607 assert_eq!(field_mappings, vec![Some(1), Some(0), None]);
612 assert_eq!(projection, vec![0, 1]); let fails_all = |_: &Field, _: &Field| Ok(false);
616 let (field_mappings, projection) =
617 create_field_mapping(&file_schema, &table_schema, fails_all).unwrap();
618
619 assert_eq!(field_mappings, vec![None, None, None]);
621 assert_eq!(projection, Vec::<usize>::new());
622
623 let error_mapper = |_: &Field, _: &Field| plan_err!("Test error");
625 let result = create_field_mapping(&file_schema, &table_schema, error_mapper);
626 assert!(result.is_err());
627 assert!(result.unwrap_err().to_string().contains("Test error"));
628 }
629
630 #[test]
631 fn test_schema_mapping_new() {
632 let projected_schema = Arc::new(Schema::new(vec![
634 Field::new("a", DataType::Int32, true),
635 Field::new("b", DataType::Utf8, true),
636 ]));
637
638 let field_mappings = vec![Some(1), Some(0)];
640
641 let mapping = SchemaMapping::new(
643 Arc::clone(&projected_schema),
644 field_mappings.clone(),
645 Arc::new(|array: &ArrayRef, field: &Field| cast_column(array, field)),
646 );
647
648 assert_eq!(*mapping.projected_table_schema, *projected_schema);
650 assert_eq!(mapping.field_mappings, field_mappings);
651
652 let batch = RecordBatch::try_new(
654 Arc::new(Schema::new(vec![
655 Field::new("b_file", DataType::Utf8, true),
656 Field::new("a_file", DataType::Int32, true),
657 ])),
658 vec![
659 Arc::new(arrow::array::StringArray::from(vec!["hello", "world"])),
660 Arc::new(arrow::array::Int32Array::from(vec![1, 2])),
661 ],
662 )
663 .unwrap();
664
665 let mapped_batch = mapping.map_batch(batch).unwrap();
667
668 assert_eq!(*mapped_batch.schema(), *projected_schema);
670 assert_eq!(mapped_batch.num_columns(), 2);
671 assert_eq!(mapped_batch.column(0).len(), 2); assert_eq!(mapped_batch.column(1).len(), 2); }
674
675 #[test]
676 fn test_map_schema_error_path() {
677 let table_schema = Arc::new(Schema::new(vec![
679 Field::new("a", DataType::Int32, true),
680 Field::new("b", DataType::Utf8, true),
681 Field::new("c", DataType::Decimal128(10, 2), true), ]));
683
684 let file_schema = Schema::new(vec![
686 Field::new("a", DataType::Int32, true),
687 Field::new("b", DataType::Float64, true), Field::new("c", DataType::Binary, true), ]);
690
691 let adapter = DefaultSchemaAdapter {
693 projected_table_schema: Arc::clone(&table_schema),
694 };
695
696 let result = adapter.map_schema(&file_schema);
698 assert!(result.is_err());
699 let error_msg = result.unwrap_err().to_string();
700 assert!(error_msg.contains("Cannot cast file schema field c"));
701 }
702
703 #[test]
704 fn test_map_schema_happy_path() {
705 let table_schema = Arc::new(Schema::new(vec![
707 Field::new("a", DataType::Int32, true),
708 Field::new("b", DataType::Utf8, true),
709 Field::new("c", DataType::Decimal128(10, 2), true),
710 ]));
711
712 let adapter = DefaultSchemaAdapter {
714 projected_table_schema: Arc::clone(&table_schema),
715 };
716
717 let compatible_file_schema = Schema::new(vec![
719 Field::new("a", DataType::Int64, true), Field::new("b", DataType::Float64, true), ]);
722
723 let (mapper, projection) = adapter.map_schema(&compatible_file_schema).unwrap();
725
726 assert_eq!(projection, vec![0, 1]); let file_batch = RecordBatch::try_new(
731 Arc::new(compatible_file_schema.clone()),
732 vec![
733 Arc::new(arrow::array::Int64Array::from(vec![100, 200])),
734 Arc::new(arrow::array::Float64Array::from(vec![1.5, 2.5])),
735 ],
736 )
737 .unwrap();
738
739 let mapped_batch = mapper.map_batch(file_batch).unwrap();
740
741 assert_eq!(*mapped_batch.schema(), *table_schema);
743 assert_eq!(mapped_batch.num_columns(), 3); let c_array = mapped_batch.column(2);
747 assert_eq!(c_array.len(), 2);
748 assert_eq!(c_array.null_count(), 2);
749 }
750
751 #[test]
752 fn test_adapt_struct_with_added_nested_fields() -> Result<()> {
753 let (file_schema, table_schema) = create_test_schemas_with_nested_fields();
754 let batch = create_test_batch_with_struct_data(&file_schema)?;
755
756 let adapter = DefaultSchemaAdapter {
757 projected_table_schema: Arc::clone(&table_schema),
758 };
759 let (mapper, _) = adapter.map_schema(file_schema.as_ref())?;
760 let mapped_batch = mapper.map_batch(batch)?;
761
762 verify_adapted_batch_with_nested_fields(&mapped_batch, &table_schema)?;
763 Ok(())
764 }
765
766 #[test]
767 fn test_map_column_statistics_struct() -> Result<()> {
768 let (file_schema, table_schema) = create_test_schemas_with_nested_fields();
769
770 let adapter = DefaultSchemaAdapter {
771 projected_table_schema: Arc::clone(&table_schema),
772 };
773 let (mapper, _) = adapter.map_schema(file_schema.as_ref())?;
774
775 let file_stats = vec![
776 create_test_column_statistics(
777 0,
778 100,
779 Some(ScalarValue::Int32(Some(1))),
780 Some(ScalarValue::Int32(Some(100))),
781 Some(ScalarValue::Int32(Some(5100))),
782 ),
783 create_test_column_statistics(10, 50, None, None, None),
784 ];
785
786 let table_stats = mapper.map_column_statistics(&file_stats)?;
787 assert_eq!(table_stats.len(), 1);
788 verify_column_statistics(
789 &table_stats[0],
790 Some(0),
791 Some(100),
792 Some(ScalarValue::Int32(Some(1))),
793 Some(ScalarValue::Int32(Some(100))),
794 Some(ScalarValue::Int32(Some(5100))),
795 );
796 let missing_stats = mapper.map_column_statistics(&[])?;
797 assert_eq!(missing_stats.len(), 1);
798 assert_eq!(missing_stats[0], ColumnStatistics::new_unknown());
799 Ok(())
800 }
801
802 fn create_test_schemas_with_nested_fields() -> (SchemaRef, SchemaRef) {
803 let file_schema = Arc::new(Schema::new(vec![Field::new(
804 "info",
805 DataType::Struct(
806 vec![
807 Field::new("location", DataType::Utf8, true),
808 Field::new(
809 "timestamp_utc",
810 DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
811 true,
812 ),
813 ]
814 .into(),
815 ),
816 true,
817 )]));
818
819 let table_schema = Arc::new(Schema::new(vec![Field::new(
820 "info",
821 DataType::Struct(
822 vec![
823 Field::new("location", DataType::Utf8, true),
824 Field::new(
825 "timestamp_utc",
826 DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
827 true,
828 ),
829 Field::new(
830 "reason",
831 DataType::Struct(
832 vec![
833 Field::new("_level", DataType::Float64, true),
834 Field::new(
835 "details",
836 DataType::Struct(
837 vec![
838 Field::new("rurl", DataType::Utf8, true),
839 Field::new("s", DataType::Float64, true),
840 Field::new("t", DataType::Utf8, true),
841 ]
842 .into(),
843 ),
844 true,
845 ),
846 ]
847 .into(),
848 ),
849 true,
850 ),
851 ]
852 .into(),
853 ),
854 true,
855 )]));
856
857 (file_schema, table_schema)
858 }
859
860 fn create_test_batch_with_struct_data(
861 file_schema: &SchemaRef,
862 ) -> Result<RecordBatch> {
863 let mut location_builder = StringBuilder::new();
864 location_builder.append_value("San Francisco");
865 location_builder.append_value("New York");
866
867 let timestamp_array = TimestampMillisecondArray::from(vec![
868 Some(1640995200000),
869 Some(1641081600000),
870 ]);
871
872 let timestamp_type =
873 DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into()));
874 let timestamp_array = cast(×tamp_array, ×tamp_type)?;
875
876 let info_struct = StructArray::from(vec![
877 (
878 Arc::new(Field::new("location", DataType::Utf8, true)),
879 Arc::new(location_builder.finish()) as ArrayRef,
880 ),
881 (
882 Arc::new(Field::new("timestamp_utc", timestamp_type, true)),
883 timestamp_array,
884 ),
885 ]);
886
887 Ok(RecordBatch::try_new(
888 Arc::clone(file_schema),
889 vec![Arc::new(info_struct)],
890 )?)
891 }
892
893 fn verify_adapted_batch_with_nested_fields(
894 mapped_batch: &RecordBatch,
895 table_schema: &SchemaRef,
896 ) -> Result<()> {
897 assert_eq!(mapped_batch.schema(), *table_schema);
898 assert_eq!(mapped_batch.num_rows(), 2);
899
900 let info_col = mapped_batch.column(0);
901 let info_array = info_col
902 .as_any()
903 .downcast_ref::<StructArray>()
904 .expect("Expected info column to be a StructArray");
905
906 verify_preserved_fields(info_array)?;
907 verify_reason_field_structure(info_array)?;
908 Ok(())
909 }
910
911 fn verify_preserved_fields(info_array: &StructArray) -> Result<()> {
912 let location_col = info_array
913 .column_by_name("location")
914 .expect("Expected location field in struct");
915 let location_array = location_col
916 .as_any()
917 .downcast_ref::<arrow::array::StringArray>()
918 .expect("Expected location to be a StringArray");
919 assert_eq!(location_array.value(0), "San Francisco");
920 assert_eq!(location_array.value(1), "New York");
921
922 let timestamp_col = info_array
923 .column_by_name("timestamp_utc")
924 .expect("Expected timestamp_utc field in struct");
925 let timestamp_array = timestamp_col
926 .as_any()
927 .downcast_ref::<TimestampMillisecondArray>()
928 .expect("Expected timestamp_utc to be a TimestampMillisecondArray");
929 assert_eq!(timestamp_array.value(0), 1640995200000);
930 assert_eq!(timestamp_array.value(1), 1641081600000);
931 Ok(())
932 }
933
934 fn verify_reason_field_structure(info_array: &StructArray) -> Result<()> {
935 let reason_col = info_array
936 .column_by_name("reason")
937 .expect("Expected reason field in struct");
938 let reason_array = reason_col
939 .as_any()
940 .downcast_ref::<StructArray>()
941 .expect("Expected reason to be a StructArray");
942 assert_eq!(reason_array.fields().len(), 2);
943 assert!(reason_array.column_by_name("_level").is_some());
944 assert!(reason_array.column_by_name("details").is_some());
945
946 let details_col = reason_array
947 .column_by_name("details")
948 .expect("Expected details field in reason struct");
949 let details_array = details_col
950 .as_any()
951 .downcast_ref::<StructArray>()
952 .expect("Expected details to be a StructArray");
953 assert_eq!(details_array.fields().len(), 3);
954 assert!(details_array.column_by_name("rurl").is_some());
955 assert!(details_array.column_by_name("s").is_some());
956 assert!(details_array.column_by_name("t").is_some());
957 for i in 0..2 {
958 assert!(reason_array.is_null(i), "reason field should be null");
959 }
960 Ok(())
961 }
962
963 fn verify_column_statistics(
964 stats: &ColumnStatistics,
965 expected_null_count: Option<usize>,
966 expected_distinct_count: Option<usize>,
967 expected_min: Option<ScalarValue>,
968 expected_max: Option<ScalarValue>,
969 expected_sum: Option<ScalarValue>,
970 ) {
971 if let Some(count) = expected_null_count {
972 assert_eq!(
973 stats.null_count,
974 Precision::Exact(count),
975 "Null count should match expected value"
976 );
977 }
978 if let Some(count) = expected_distinct_count {
979 assert_eq!(
980 stats.distinct_count,
981 Precision::Exact(count),
982 "Distinct count should match expected value"
983 );
984 }
985 if let Some(min) = expected_min {
986 assert_eq!(
987 stats.min_value,
988 Precision::Exact(min),
989 "Min value should match expected value"
990 );
991 }
992 if let Some(max) = expected_max {
993 assert_eq!(
994 stats.max_value,
995 Precision::Exact(max),
996 "Max value should match expected value"
997 );
998 }
999 if let Some(sum) = expected_sum {
1000 assert_eq!(
1001 stats.sum_value,
1002 Precision::Exact(sum),
1003 "Sum value should match expected value"
1004 );
1005 }
1006 }
1007
1008 fn create_test_column_statistics(
1009 null_count: usize,
1010 distinct_count: usize,
1011 min_value: Option<ScalarValue>,
1012 max_value: Option<ScalarValue>,
1013 sum_value: Option<ScalarValue>,
1014 ) -> ColumnStatistics {
1015 ColumnStatistics {
1016 null_count: Precision::Exact(null_count),
1017 distinct_count: Precision::Exact(distinct_count),
1018 min_value: min_value.map_or_else(|| Precision::Absent, Precision::Exact),
1019 max_value: max_value.map_or_else(|| Precision::Absent, Precision::Exact),
1020 sum_value: sum_value.map_or_else(|| Precision::Absent, Precision::Exact),
1021 }
1022 }
1023}