1use std::borrow::Borrow;
23use std::collections::HashMap;
24use std::hash::Hash;
25use std::sync::Arc;
26
27use arrow::compute::can_cast_types;
28use arrow::datatypes::{DataType, Schema, SchemaRef};
29use datafusion_common::{
30 Result, ScalarValue, exec_err,
31 nested_struct::validate_struct_compatibility,
32 tree_node::{Transformed, TransformedResult, TreeNode},
33};
34use datafusion_functions::core::getfield::GetFieldFunc;
35use datafusion_physical_expr::expressions::CastColumnExpr;
36use datafusion_physical_expr::{
37 ScalarFunctionExpr,
38 expressions::{self, Column},
39};
40use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
41
42pub fn replace_columns_with_literals<K, V>(
61 expr: Arc<dyn PhysicalExpr>,
62 replacements: &HashMap<K, V>,
63) -> Result<Arc<dyn PhysicalExpr>>
64where
65 K: Borrow<str> + Eq + Hash,
66 V: Borrow<ScalarValue>,
67{
68 expr.transform_down(|expr| {
69 if let Some(column) = expr.as_any().downcast_ref::<Column>()
70 && let Some(replacement_value) = replacements.get(column.name())
71 {
72 return Ok(Transformed::yes(expressions::lit(
73 replacement_value.borrow().clone(),
74 )));
75 }
76 Ok(Transformed::no(expr))
77 })
78 .data()
79}
80
81pub trait PhysicalExprAdapter: Send + Sync + std::fmt::Debug {
149 fn rewrite(&self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>>;
166}
167
168pub trait PhysicalExprAdapterFactory: Send + Sync + std::fmt::Debug {
172 fn create(
174 &self,
175 logical_file_schema: SchemaRef,
176 physical_file_schema: SchemaRef,
177 ) -> Arc<dyn PhysicalExprAdapter>;
178}
179
180#[derive(Debug, Clone)]
181pub struct DefaultPhysicalExprAdapterFactory;
182
183impl PhysicalExprAdapterFactory for DefaultPhysicalExprAdapterFactory {
184 fn create(
185 &self,
186 logical_file_schema: SchemaRef,
187 physical_file_schema: SchemaRef,
188 ) -> Arc<dyn PhysicalExprAdapter> {
189 Arc::new(DefaultPhysicalExprAdapter {
190 logical_file_schema,
191 physical_file_schema,
192 })
193 }
194}
195
196#[derive(Debug, Clone)]
237pub struct DefaultPhysicalExprAdapter {
238 logical_file_schema: SchemaRef,
239 physical_file_schema: SchemaRef,
240}
241
242impl DefaultPhysicalExprAdapter {
243 pub fn new(logical_file_schema: SchemaRef, physical_file_schema: SchemaRef) -> Self {
248 Self {
249 logical_file_schema,
250 physical_file_schema,
251 }
252 }
253}
254
255impl PhysicalExprAdapter for DefaultPhysicalExprAdapter {
256 fn rewrite(&self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>> {
257 let rewriter = DefaultPhysicalExprAdapterRewriter {
258 logical_file_schema: &self.logical_file_schema,
259 physical_file_schema: &self.physical_file_schema,
260 };
261 expr.transform(|expr| rewriter.rewrite_expr(Arc::clone(&expr)))
262 .data()
263 }
264}
265
266struct DefaultPhysicalExprAdapterRewriter<'a> {
267 logical_file_schema: &'a Schema,
268 physical_file_schema: &'a Schema,
269}
270
271impl<'a> DefaultPhysicalExprAdapterRewriter<'a> {
272 fn rewrite_expr(
273 &self,
274 expr: Arc<dyn PhysicalExpr>,
275 ) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
276 if let Some(transformed) = self.try_rewrite_struct_field_access(&expr)? {
277 return Ok(Transformed::yes(transformed));
278 }
279
280 if let Some(column) = expr.as_any().downcast_ref::<Column>() {
281 return self.rewrite_column(Arc::clone(&expr), column);
282 }
283
284 Ok(Transformed::no(expr))
285 }
286
287 fn try_rewrite_struct_field_access(
291 &self,
292 expr: &Arc<dyn PhysicalExpr>,
293 ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
294 let get_field_expr =
295 match ScalarFunctionExpr::try_downcast_func::<GetFieldFunc>(expr.as_ref()) {
296 Some(expr) => expr,
297 None => return Ok(None),
298 };
299
300 let source_expr = match get_field_expr.args().first() {
301 Some(expr) => expr,
302 None => return Ok(None),
303 };
304
305 let field_name_expr = match get_field_expr.args().get(1) {
306 Some(expr) => expr,
307 None => return Ok(None),
308 };
309
310 let lit = match field_name_expr
311 .as_any()
312 .downcast_ref::<expressions::Literal>()
313 {
314 Some(lit) => lit,
315 None => return Ok(None),
316 };
317
318 let field_name = match lit.value().try_as_str().flatten() {
319 Some(name) => name,
320 None => return Ok(None),
321 };
322
323 let column = match source_expr.as_any().downcast_ref::<Column>() {
324 Some(column) => column,
325 None => return Ok(None),
326 };
327
328 let physical_field =
329 match self.physical_file_schema.field_with_name(column.name()) {
330 Ok(field) => field,
331 Err(_) => return Ok(None),
332 };
333
334 let physical_struct_fields = match physical_field.data_type() {
335 DataType::Struct(fields) => fields,
336 _ => return Ok(None),
337 };
338
339 if physical_struct_fields
340 .iter()
341 .any(|f| f.name() == field_name)
342 {
343 return Ok(None);
344 }
345
346 let logical_field = match self.logical_file_schema.field_with_name(column.name())
347 {
348 Ok(field) => field,
349 Err(_) => return Ok(None),
350 };
351
352 let logical_struct_fields = match logical_field.data_type() {
353 DataType::Struct(fields) => fields,
354 _ => return Ok(None),
355 };
356
357 let logical_struct_field = match logical_struct_fields
358 .iter()
359 .find(|f| f.name() == field_name)
360 {
361 Some(field) => field,
362 None => return Ok(None),
363 };
364
365 let null_value = ScalarValue::Null.cast_to(logical_struct_field.data_type())?;
366 Ok(Some(expressions::lit(null_value)))
367 }
368
369 fn rewrite_column(
370 &self,
371 expr: Arc<dyn PhysicalExpr>,
372 column: &Column,
373 ) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
374 let logical_field = match self.logical_file_schema.field_with_name(column.name())
376 {
377 Ok(field) => field,
378 Err(e) => {
379 if let Ok(physical_field) =
383 self.physical_file_schema.field_with_name(column.name())
384 {
385 physical_field
389 } else {
390 return Err(e.into());
394 }
395 }
396 };
397
398 let physical_column_index = match self
400 .physical_file_schema
401 .index_of(column.name())
402 {
403 Ok(index) => index,
404 Err(_) => {
405 if !logical_field.is_nullable() {
406 return exec_err!(
407 "Non-nullable column '{}' is missing from the physical schema",
408 column.name()
409 );
410 }
411 let null_value = ScalarValue::Null.cast_to(logical_field.data_type())?;
414 return Ok(Transformed::yes(expressions::lit(null_value)));
415 }
416 };
417 let physical_field = self.physical_file_schema.field(physical_column_index);
418
419 let column = match (
420 column.index() == physical_column_index,
421 logical_field.data_type() == physical_field.data_type(),
422 ) {
423 (true, true) => return Ok(Transformed::no(expr)),
425 (true, _) => column.clone(),
427 (false, _) => {
428 Column::new_with_schema(logical_field.name(), self.physical_file_schema)?
429 }
430 };
431
432 if logical_field.data_type() == physical_field.data_type() {
433 return Ok(Transformed::yes(Arc::new(column)));
435 }
436
437 match (physical_field.data_type(), logical_field.data_type()) {
448 (DataType::Struct(physical_fields), DataType::Struct(logical_fields)) => {
449 validate_struct_compatibility(physical_fields, logical_fields)?;
450 }
451 _ => {
452 let is_compatible =
453 can_cast_types(physical_field.data_type(), logical_field.data_type());
454 if !is_compatible {
455 return exec_err!(
456 "Cannot cast column '{}' from '{}' (physical data type) to '{}' (logical data type)",
457 column.name(),
458 physical_field.data_type(),
459 logical_field.data_type()
460 );
461 }
462 }
463 }
464
465 let cast_expr = Arc::new(CastColumnExpr::new(
466 Arc::new(column),
467 Arc::new(physical_field.clone()),
468 Arc::new(logical_field.clone()),
469 None,
470 ));
471
472 Ok(Transformed::yes(cast_expr))
473 }
474}
475
476#[cfg(test)]
477mod tests {
478 use super::*;
479 use arrow::array::{
480 BooleanArray, Int32Array, Int64Array, RecordBatch, RecordBatchOptions,
481 StringArray, StringViewArray, StructArray,
482 };
483 use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
484 use datafusion_common::{Result, ScalarValue, assert_contains, record_batch};
485 use datafusion_expr::Operator;
486 use datafusion_physical_expr::expressions::{Column, Literal, col, lit};
487 use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
488 use itertools::Itertools;
489 use std::sync::Arc;
490
491 fn create_test_schema() -> (Schema, Schema) {
492 let physical_schema = Schema::new(vec![
493 Field::new("a", DataType::Int32, false),
494 Field::new("b", DataType::Utf8, true),
495 ]);
496
497 let logical_schema = Schema::new(vec![
498 Field::new("a", DataType::Int64, false), Field::new("b", DataType::Utf8, true),
500 Field::new("c", DataType::Float64, true), ]);
502
503 (physical_schema, logical_schema)
504 }
505
506 #[test]
507 fn test_rewrite_column_with_type_cast() {
508 let (physical_schema, logical_schema) = create_test_schema();
509
510 let factory = DefaultPhysicalExprAdapterFactory;
511 let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema));
512 let column_expr = Arc::new(Column::new("a", 0));
513
514 let result = adapter.rewrite(column_expr).unwrap();
515
516 assert!(result.as_any().downcast_ref::<CastColumnExpr>().is_some());
518 }
519
520 #[test]
521 fn test_rewrite_multi_column_expr_with_type_cast() {
522 let (physical_schema, logical_schema) = create_test_schema();
523 let factory = DefaultPhysicalExprAdapterFactory;
524 let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema));
525
526 let column_a = Arc::new(Column::new("a", 0)) as Arc<dyn PhysicalExpr>;
528 let column_c = Arc::new(Column::new("c", 2)) as Arc<dyn PhysicalExpr>;
529 let expr = expressions::BinaryExpr::new(
530 Arc::clone(&column_a),
531 Operator::Plus,
532 Arc::new(expressions::Literal::new(ScalarValue::Int64(Some(5)))),
533 );
534 let expr = expressions::BinaryExpr::new(
535 Arc::new(expr),
536 Operator::Or,
537 Arc::new(expressions::BinaryExpr::new(
538 Arc::clone(&column_c),
539 Operator::Gt,
540 Arc::new(expressions::Literal::new(ScalarValue::Float64(Some(0.0)))),
541 )),
542 );
543
544 let result = adapter.rewrite(Arc::new(expr)).unwrap();
545 println!("Rewritten expression: {result}");
546
547 let expected = expressions::BinaryExpr::new(
548 Arc::new(CastColumnExpr::new(
549 Arc::new(Column::new("a", 0)),
550 Arc::new(Field::new("a", DataType::Int32, false)),
551 Arc::new(Field::new("a", DataType::Int64, false)),
552 None,
553 )),
554 Operator::Plus,
555 Arc::new(expressions::Literal::new(ScalarValue::Int64(Some(5)))),
556 );
557 let expected = Arc::new(expressions::BinaryExpr::new(
558 Arc::new(expected),
559 Operator::Or,
560 Arc::new(expressions::BinaryExpr::new(
561 lit(ScalarValue::Float64(None)), Operator::Gt,
563 Arc::new(expressions::Literal::new(ScalarValue::Float64(Some(0.0)))),
564 )),
565 )) as Arc<dyn PhysicalExpr>;
566
567 assert_eq!(
568 result.to_string(),
569 expected.to_string(),
570 "The rewritten expression did not match the expected output"
571 );
572 }
573
574 #[test]
575 fn test_rewrite_struct_column_incompatible() {
576 let physical_schema = Schema::new(vec![Field::new(
577 "data",
578 DataType::Struct(vec![Field::new("field1", DataType::Binary, true)].into()),
579 true,
580 )]);
581
582 let logical_schema = Schema::new(vec![Field::new(
583 "data",
584 DataType::Struct(vec![Field::new("field1", DataType::Int32, true)].into()),
585 true,
586 )]);
587
588 let factory = DefaultPhysicalExprAdapterFactory;
589 let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema));
590 let column_expr = Arc::new(Column::new("data", 0));
591
592 let error_msg = adapter.rewrite(column_expr).unwrap_err().to_string();
593 assert_contains!(
595 error_msg,
596 "Cannot cast struct field 'field1' from type Binary to type Int32"
597 );
598 }
599
600 #[test]
601 fn test_rewrite_struct_compatible_cast() {
602 let physical_schema = Schema::new(vec![Field::new(
603 "data",
604 DataType::Struct(
605 vec![
606 Field::new("id", DataType::Int32, false),
607 Field::new("name", DataType::Utf8, true),
608 ]
609 .into(),
610 ),
611 false,
612 )]);
613
614 let logical_schema = Schema::new(vec![Field::new(
615 "data",
616 DataType::Struct(
617 vec![
618 Field::new("id", DataType::Int64, false),
619 Field::new("name", DataType::Utf8View, true),
620 ]
621 .into(),
622 ),
623 false,
624 )]);
625
626 let factory = DefaultPhysicalExprAdapterFactory;
627 let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema));
628 let column_expr = Arc::new(Column::new("data", 0));
629
630 let result = adapter.rewrite(column_expr).unwrap();
631
632 let expected = Arc::new(CastColumnExpr::new(
633 Arc::new(Column::new("data", 0)),
634 Arc::new(Field::new(
635 "data",
636 DataType::Struct(
637 vec![
638 Field::new("id", DataType::Int32, false),
639 Field::new("name", DataType::Utf8, true),
640 ]
641 .into(),
642 ),
643 false,
644 )),
645 Arc::new(Field::new(
646 "data",
647 DataType::Struct(
648 vec![
649 Field::new("id", DataType::Int64, false),
650 Field::new("name", DataType::Utf8View, true),
651 ]
652 .into(),
653 ),
654 false,
655 )),
656 None,
657 )) as Arc<dyn PhysicalExpr>;
658
659 assert_eq!(result.to_string(), expected.to_string());
660 }
661
662 #[test]
663 fn test_rewrite_missing_column() -> Result<()> {
664 let (physical_schema, logical_schema) = create_test_schema();
665
666 let factory = DefaultPhysicalExprAdapterFactory;
667 let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema));
668 let column_expr = Arc::new(Column::new("c", 2));
669
670 let result = adapter.rewrite(column_expr)?;
671
672 if let Some(literal) = result.as_any().downcast_ref::<expressions::Literal>() {
674 assert_eq!(*literal.value(), ScalarValue::Float64(None));
675 } else {
676 panic!("Expected literal expression");
677 }
678
679 Ok(())
680 }
681
682 #[test]
683 fn test_rewrite_missing_column_non_nullable_error() {
684 let physical_schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
685 let logical_schema = Schema::new(vec![
686 Field::new("a", DataType::Int64, false),
687 Field::new("b", DataType::Utf8, false), ]);
689
690 let factory = DefaultPhysicalExprAdapterFactory;
691 let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema));
692 let column_expr = Arc::new(Column::new("b", 1));
693
694 let error_msg = adapter.rewrite(column_expr).unwrap_err().to_string();
695 assert_contains!(error_msg, "Non-nullable column 'b' is missing");
696 }
697
698 #[test]
699 fn test_rewrite_missing_column_nullable() {
700 let physical_schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
701 let logical_schema = Schema::new(vec![
702 Field::new("a", DataType::Int64, false),
703 Field::new("b", DataType::Utf8, true), ]);
705
706 let factory = DefaultPhysicalExprAdapterFactory;
707 let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema));
708 let column_expr = Arc::new(Column::new("b", 1));
709
710 let result = adapter.rewrite(column_expr).unwrap();
711
712 let expected =
713 Arc::new(Literal::new(ScalarValue::Utf8(None))) as Arc<dyn PhysicalExpr>;
714
715 assert_eq!(result.to_string(), expected.to_string());
716 }
717
718 #[test]
719 fn test_replace_columns_with_literals() -> Result<()> {
720 let partition_value = ScalarValue::Utf8(Some("test_value".to_string()));
721 let replacements = HashMap::from([("partition_col", &partition_value)]);
722
723 let column_expr =
724 Arc::new(Column::new("partition_col", 0)) as Arc<dyn PhysicalExpr>;
725 let result = replace_columns_with_literals(column_expr, &replacements)?;
726
727 let literal = result
729 .as_any()
730 .downcast_ref::<expressions::Literal>()
731 .expect("Expected literal expression");
732 assert_eq!(*literal.value(), partition_value);
733
734 Ok(())
735 }
736
737 #[test]
738 fn test_replace_columns_with_literals_no_match() -> Result<()> {
739 let value = ScalarValue::Utf8(Some("test_value".to_string()));
740 let replacements = HashMap::from([("other_col", &value)]);
741
742 let column_expr =
743 Arc::new(Column::new("partition_col", 0)) as Arc<dyn PhysicalExpr>;
744 let result = replace_columns_with_literals(column_expr, &replacements)?;
745
746 assert!(result.as_any().downcast_ref::<Column>().is_some());
747 Ok(())
748 }
749
750 #[test]
751 fn test_replace_columns_with_literals_nested_expr() -> Result<()> {
752 let value_a = ScalarValue::Int64(Some(10));
753 let value_b = ScalarValue::Int64(Some(20));
754 let replacements = HashMap::from([("a", &value_a), ("b", &value_b)]);
755
756 let expr = Arc::new(expressions::BinaryExpr::new(
757 Arc::new(Column::new("a", 0)),
758 Operator::Plus,
759 Arc::new(Column::new("b", 1)),
760 )) as Arc<dyn PhysicalExpr>;
761
762 let result = replace_columns_with_literals(expr, &replacements)?;
763 assert_eq!(result.to_string(), "10 + 20");
764
765 Ok(())
766 }
767
768 #[test]
769 fn test_rewrite_no_change_needed() -> Result<()> {
770 let (physical_schema, logical_schema) = create_test_schema();
771
772 let factory = DefaultPhysicalExprAdapterFactory;
773 let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema));
774 let column_expr = Arc::new(Column::new("b", 1)) as Arc<dyn PhysicalExpr>;
775
776 let result = adapter.rewrite(Arc::clone(&column_expr))?;
777
778 assert!(std::ptr::eq(
781 column_expr.as_ref() as *const dyn PhysicalExpr,
782 result.as_ref() as *const dyn PhysicalExpr
783 ));
784
785 Ok(())
786 }
787
788 #[test]
789 fn test_non_nullable_missing_column_error() {
790 let physical_schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
791 let logical_schema = Schema::new(vec![
792 Field::new("a", DataType::Int32, false),
793 Field::new("b", DataType::Utf8, false), ]);
795
796 let factory = DefaultPhysicalExprAdapterFactory;
797 let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema));
798 let column_expr = Arc::new(Column::new("b", 1));
799
800 let result = adapter.rewrite(column_expr);
801 assert!(result.is_err());
802 assert_contains!(
803 result.unwrap_err().to_string(),
804 "Non-nullable column 'b' is missing from the physical schema"
805 );
806 }
807
808 fn batch_project(
810 expr: Vec<Arc<dyn PhysicalExpr>>,
811 batch: &RecordBatch,
812 schema: SchemaRef,
813 ) -> Result<RecordBatch> {
814 let arrays = expr
815 .iter()
816 .map(|expr| {
817 expr.evaluate(batch)
818 .and_then(|v| v.into_array(batch.num_rows()))
819 })
820 .collect::<Result<Vec<_>>>()?;
821
822 if arrays.is_empty() {
823 let options =
824 RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
825 RecordBatch::try_new_with_options(Arc::clone(&schema), arrays, &options)
826 .map_err(Into::into)
827 } else {
828 RecordBatch::try_new(Arc::clone(&schema), arrays).map_err(Into::into)
829 }
830 }
831
832 #[test]
835 fn test_adapt_batches() {
836 let physical_batch = record_batch!(
837 ("a", Int32, vec![Some(1), None, Some(3)]),
838 ("extra", Utf8, vec![Some("x"), Some("y"), None])
839 )
840 .unwrap();
841
842 let physical_schema = physical_batch.schema();
843
844 let logical_schema = Arc::new(Schema::new(vec![
845 Field::new("a", DataType::Int64, true), Field::new("b", DataType::Utf8, true), ]));
848
849 let projection = vec![
850 col("b", &logical_schema).unwrap(),
851 col("a", &logical_schema).unwrap(),
852 ];
853
854 let factory = DefaultPhysicalExprAdapterFactory;
855 let adapter =
856 factory.create(Arc::clone(&logical_schema), Arc::clone(&physical_schema));
857
858 let adapted_projection = projection
859 .into_iter()
860 .map(|expr| adapter.rewrite(expr).unwrap())
861 .collect_vec();
862
863 let adapted_schema = Arc::new(Schema::new(
864 adapted_projection
865 .iter()
866 .map(|expr| expr.return_field(&physical_schema).unwrap())
867 .collect_vec(),
868 ));
869
870 let res = batch_project(
871 adapted_projection,
872 &physical_batch,
873 Arc::clone(&adapted_schema),
874 )
875 .unwrap();
876
877 assert_eq!(res.num_columns(), 2);
878 assert_eq!(res.column(0).data_type(), &DataType::Utf8);
879 assert_eq!(res.column(1).data_type(), &DataType::Int64);
880 assert_eq!(
881 res.column(0)
882 .as_any()
883 .downcast_ref::<arrow::array::StringArray>()
884 .unwrap()
885 .iter()
886 .collect_vec(),
887 vec![None, None, None]
888 );
889 assert_eq!(
890 res.column(1)
891 .as_any()
892 .downcast_ref::<arrow::array::Int64Array>()
893 .unwrap()
894 .iter()
895 .collect_vec(),
896 vec![Some(1), None, Some(3)]
897 );
898 }
899
900 #[test]
904 fn test_adapt_struct_batches() {
905 let physical_struct_fields: Fields = vec![
907 Field::new("id", DataType::Int32, false),
908 Field::new("name", DataType::Utf8, true),
909 ]
910 .into();
911
912 let struct_array = StructArray::new(
913 physical_struct_fields.clone(),
914 vec![
915 Arc::new(Int32Array::from(vec![1, 2, 3])) as _,
916 Arc::new(StringArray::from(vec![
917 Some("alice"),
918 None,
919 Some("charlie"),
920 ])) as _,
921 ],
922 None,
923 );
924
925 let physical_schema = Arc::new(Schema::new(vec![Field::new(
926 "data",
927 DataType::Struct(physical_struct_fields),
928 false,
929 )]));
930
931 let physical_batch = RecordBatch::try_new(
932 Arc::clone(&physical_schema),
933 vec![Arc::new(struct_array)],
934 )
935 .unwrap();
936
937 let logical_struct_fields: Fields = vec![
942 Field::new("id", DataType::Int64, false),
943 Field::new("name", DataType::Utf8View, true),
944 Field::new("extra", DataType::Boolean, true), ]
946 .into();
947
948 let logical_schema = Arc::new(Schema::new(vec![Field::new(
949 "data",
950 DataType::Struct(logical_struct_fields),
951 false,
952 )]));
953
954 let projection = vec![col("data", &logical_schema).unwrap()];
955
956 let factory = DefaultPhysicalExprAdapterFactory;
957 let adapter =
958 factory.create(Arc::clone(&logical_schema), Arc::clone(&physical_schema));
959
960 let adapted_projection = projection
961 .into_iter()
962 .map(|expr| adapter.rewrite(expr).unwrap())
963 .collect_vec();
964
965 let adapted_schema = Arc::new(Schema::new(
966 adapted_projection
967 .iter()
968 .map(|expr| expr.return_field(&physical_schema).unwrap())
969 .collect_vec(),
970 ));
971
972 let res = batch_project(
973 adapted_projection,
974 &physical_batch,
975 Arc::clone(&adapted_schema),
976 )
977 .unwrap();
978
979 assert_eq!(res.num_columns(), 1);
980
981 let result_struct = res
982 .column(0)
983 .as_any()
984 .downcast_ref::<StructArray>()
985 .unwrap();
986
987 let id_col = result_struct.column_by_name("id").unwrap();
989 assert_eq!(id_col.data_type(), &DataType::Int64);
990 let id_values = id_col.as_any().downcast_ref::<Int64Array>().unwrap();
991 assert_eq!(
992 id_values.iter().collect_vec(),
993 vec![Some(1), Some(2), Some(3)]
994 );
995
996 let name_col = result_struct.column_by_name("name").unwrap();
998 assert_eq!(name_col.data_type(), &DataType::Utf8View);
999 let name_values = name_col.as_any().downcast_ref::<StringViewArray>().unwrap();
1000 assert_eq!(
1001 name_values.iter().collect_vec(),
1002 vec![Some("alice"), None, Some("charlie")]
1003 );
1004
1005 let extra_col = result_struct.column_by_name("extra").unwrap();
1007 assert_eq!(extra_col.data_type(), &DataType::Boolean);
1008 let extra_values = extra_col.as_any().downcast_ref::<BooleanArray>().unwrap();
1009 assert_eq!(extra_values.iter().collect_vec(), vec![None, None, None]);
1010 }
1011
1012 #[test]
1013 fn test_try_rewrite_struct_field_access() {
1014 let physical_schema = Schema::new(vec![Field::new(
1016 "struct_col",
1017 DataType::Struct(
1018 vec![Field::new("existing_field", DataType::Int32, true)].into(),
1019 ),
1020 true,
1021 )]);
1022
1023 let logical_schema = Schema::new(vec![Field::new(
1024 "struct_col",
1025 DataType::Struct(
1026 vec![
1027 Field::new("existing_field", DataType::Int32, true),
1028 Field::new("missing_field", DataType::Utf8, true),
1029 ]
1030 .into(),
1031 ),
1032 true,
1033 )]);
1034
1035 let rewriter = DefaultPhysicalExprAdapterRewriter {
1036 logical_file_schema: &logical_schema,
1037 physical_file_schema: &physical_schema,
1038 };
1039
1040 let column = Arc::new(Column::new("struct_col", 0)) as Arc<dyn PhysicalExpr>;
1042 let result = rewriter.try_rewrite_struct_field_access(&column).unwrap();
1043 assert!(result.is_none());
1044
1045 }
1049}