datafusion_comet_spark_expr/struct_funcs/
get_struct_field.rs1use arrow::record_batch::RecordBatch;
19use arrow_array::{Array, StructArray};
20use arrow_schema::{DataType, Field, Schema};
21use datafusion::logical_expr::ColumnarValue;
22use datafusion_common::{DataFusionError, Result as DataFusionResult, ScalarValue};
23use datafusion_physical_expr::PhysicalExpr;
24use std::{
25 any::Any,
26 fmt::{Display, Formatter},
27 hash::Hash,
28 sync::Arc,
29};
30
31#[derive(Debug, Eq)]
32pub struct GetStructField {
33 child: Arc<dyn PhysicalExpr>,
34 ordinal: usize,
35}
36
37impl Hash for GetStructField {
38 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
39 self.child.hash(state);
40 self.ordinal.hash(state);
41 }
42}
43impl PartialEq for GetStructField {
44 fn eq(&self, other: &Self) -> bool {
45 self.child.eq(&other.child) && self.ordinal.eq(&other.ordinal)
46 }
47}
48
49impl GetStructField {
50 pub fn new(child: Arc<dyn PhysicalExpr>, ordinal: usize) -> Self {
51 Self { child, ordinal }
52 }
53
54 fn child_field(&self, input_schema: &Schema) -> DataFusionResult<Arc<Field>> {
55 match self.child.data_type(input_schema)? {
56 DataType::Struct(fields) => Ok(Arc::clone(&fields[self.ordinal])),
57 data_type => Err(DataFusionError::Plan(format!(
58 "Expect struct field, got {:?}",
59 data_type
60 ))),
61 }
62 }
63}
64
65impl PhysicalExpr for GetStructField {
66 fn as_any(&self) -> &dyn Any {
67 self
68 }
69
70 fn data_type(&self, input_schema: &Schema) -> DataFusionResult<DataType> {
71 Ok(self.child_field(input_schema)?.data_type().clone())
72 }
73
74 fn nullable(&self, input_schema: &Schema) -> DataFusionResult<bool> {
75 Ok(self.child_field(input_schema)?.is_nullable())
76 }
77
78 fn evaluate(&self, batch: &RecordBatch) -> DataFusionResult<ColumnarValue> {
79 let child_value = self.child.evaluate(batch)?;
80
81 match child_value {
82 ColumnarValue::Array(array) => {
83 let struct_array = array
84 .as_any()
85 .downcast_ref::<StructArray>()
86 .expect("A struct is expected");
87
88 Ok(ColumnarValue::Array(Arc::clone(
89 struct_array.column(self.ordinal),
90 )))
91 }
92 ColumnarValue::Scalar(ScalarValue::Struct(struct_array)) => Ok(ColumnarValue::Array(
93 Arc::clone(struct_array.column(self.ordinal)),
94 )),
95 value => Err(DataFusionError::Execution(format!(
96 "Expected a struct array, got {:?}",
97 value
98 ))),
99 }
100 }
101
102 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
103 vec![&self.child]
104 }
105
106 fn with_new_children(
107 self: Arc<Self>,
108 children: Vec<Arc<dyn PhysicalExpr>>,
109 ) -> datafusion_common::Result<Arc<dyn PhysicalExpr>> {
110 Ok(Arc::new(GetStructField::new(
111 Arc::clone(&children[0]),
112 self.ordinal,
113 )))
114 }
115}
116
117impl Display for GetStructField {
118 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
119 write!(
120 f,
121 "GetStructField [child: {:?}, ordinal: {:?}]",
122 self.child, self.ordinal
123 )
124 }
125}