datafusion_comet_spark_expr/struct_funcs/
get_struct_field.rs1use arrow::array::{Array, StructArray};
19use arrow::datatypes::{DataType, Field, Schema};
20use arrow::record_batch::RecordBatch;
21use datafusion::common::{DataFusionError, Result as DataFusionResult, ScalarValue};
22use datafusion::logical_expr::ColumnarValue;
23use datafusion::physical_expr::PhysicalExpr;
24use std::{
25 any::Any,
26 fmt::{Display, Formatter},
27 hash::Hash,
28 sync::Arc,
29};
30
31#[derive(Debug, Eq)]
32pub struct GetStructField {
33 child: Arc<dyn PhysicalExpr>,
34 ordinal: usize,
35}
36
37impl Hash for GetStructField {
38 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
39 self.child.hash(state);
40 self.ordinal.hash(state);
41 }
42}
43impl PartialEq for GetStructField {
44 fn eq(&self, other: &Self) -> bool {
45 self.child.eq(&other.child) && self.ordinal.eq(&other.ordinal)
46 }
47}
48
49impl GetStructField {
50 pub fn new(child: Arc<dyn PhysicalExpr>, ordinal: usize) -> Self {
51 Self { child, ordinal }
52 }
53
54 fn child_field(&self, input_schema: &Schema) -> DataFusionResult<Arc<Field>> {
55 match self.child.data_type(input_schema)? {
56 DataType::Struct(fields) => Ok(Arc::clone(&fields[self.ordinal])),
57 data_type => Err(DataFusionError::Plan(format!(
58 "Expect struct field, got {data_type:?}"
59 ))),
60 }
61 }
62}
63
64impl PhysicalExpr for GetStructField {
65 fn as_any(&self) -> &dyn Any {
66 self
67 }
68
69 fn fmt_sql(&self, _: &mut Formatter<'_>) -> std::fmt::Result {
70 unimplemented!()
71 }
72
73 fn data_type(&self, input_schema: &Schema) -> DataFusionResult<DataType> {
74 Ok(self.child_field(input_schema)?.data_type().clone())
75 }
76
77 fn nullable(&self, input_schema: &Schema) -> DataFusionResult<bool> {
78 Ok(self.child_field(input_schema)?.is_nullable())
79 }
80
81 fn evaluate(&self, batch: &RecordBatch) -> DataFusionResult<ColumnarValue> {
82 let child_value = self.child.evaluate(batch)?;
83
84 match child_value {
85 ColumnarValue::Array(array) => {
86 let struct_array = array
87 .as_any()
88 .downcast_ref::<StructArray>()
89 .expect("A struct is expected");
90
91 Ok(ColumnarValue::Array(Arc::clone(
92 struct_array.column(self.ordinal),
93 )))
94 }
95 ColumnarValue::Scalar(ScalarValue::Struct(struct_array)) => Ok(ColumnarValue::Array(
96 Arc::clone(struct_array.column(self.ordinal)),
97 )),
98 value => Err(DataFusionError::Execution(format!(
99 "Expected a struct array, got {value:?}"
100 ))),
101 }
102 }
103
104 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
105 vec![&self.child]
106 }
107
108 fn with_new_children(
109 self: Arc<Self>,
110 children: Vec<Arc<dyn PhysicalExpr>>,
111 ) -> datafusion::common::Result<Arc<dyn PhysicalExpr>> {
112 Ok(Arc::new(GetStructField::new(
113 Arc::clone(&children[0]),
114 self.ordinal,
115 )))
116 }
117}
118
119impl Display for GetStructField {
120 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
121 write!(
122 f,
123 "GetStructField [child: {:?}, ordinal: {:?}]",
124 self.child, self.ordinal
125 )
126 }
127}