datafusion_comet_spark_expr/struct_funcs/
create_named_struct.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow::array::StructArray;
19use arrow::datatypes::{DataType, Field, Schema};
20use arrow::record_batch::RecordBatch;
21use datafusion::common::Result as DataFusionResult;
22use datafusion::logical_expr::ColumnarValue;
23use datafusion::physical_expr::PhysicalExpr;
24use std::{
25    any::Any,
26    fmt::{Display, Formatter},
27    hash::Hash,
28    sync::Arc,
29};
30
31#[derive(Debug, Hash, PartialEq, Eq)]
32pub struct CreateNamedStruct {
33    values: Vec<Arc<dyn PhysicalExpr>>,
34    names: Vec<String>,
35}
36
37impl CreateNamedStruct {
38    pub fn new(values: Vec<Arc<dyn PhysicalExpr>>, names: Vec<String>) -> Self {
39        Self { values, names }
40    }
41
42    fn fields(&self, schema: &Schema) -> DataFusionResult<Vec<Field>> {
43        self.values
44            .iter()
45            .zip(&self.names)
46            .map(|(expr, name)| {
47                let data_type = expr.data_type(schema)?;
48                let nullable = expr.nullable(schema)?;
49                Ok(Field::new(name, data_type, nullable))
50            })
51            .collect()
52    }
53}
54
55impl PhysicalExpr for CreateNamedStruct {
56    fn as_any(&self) -> &dyn Any {
57        self
58    }
59
60    fn fmt_sql(&self, _: &mut Formatter<'_>) -> std::fmt::Result {
61        unimplemented!()
62    }
63
64    fn data_type(&self, input_schema: &Schema) -> DataFusionResult<DataType> {
65        let fields = self.fields(input_schema)?;
66        Ok(DataType::Struct(fields.into()))
67    }
68
69    fn nullable(&self, _input_schema: &Schema) -> DataFusionResult<bool> {
70        Ok(false)
71    }
72
73    fn evaluate(&self, batch: &RecordBatch) -> DataFusionResult<ColumnarValue> {
74        let values = self
75            .values
76            .iter()
77            .map(|expr| expr.evaluate(batch))
78            .collect::<datafusion::common::Result<Vec<_>>>()?;
79        let arrays = ColumnarValue::values_to_arrays(&values)?;
80        let fields = self.fields(&batch.schema())?;
81        Ok(ColumnarValue::Array(Arc::new(StructArray::new(
82            fields.into(),
83            arrays,
84            None,
85        ))))
86    }
87
88    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
89        self.values.iter().collect()
90    }
91
92    fn with_new_children(
93        self: Arc<Self>,
94        children: Vec<Arc<dyn PhysicalExpr>>,
95    ) -> datafusion::common::Result<Arc<dyn PhysicalExpr>> {
96        Ok(Arc::new(CreateNamedStruct::new(
97            children.clone(),
98            self.names.clone(),
99        )))
100    }
101}
102
103impl Display for CreateNamedStruct {
104    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
105        write!(
106            f,
107            "CreateNamedStruct [values: {:?}, names: {:?}]",
108            self.values, self.names
109        )
110    }
111}
112
113#[cfg(test)]
114mod test {
115    use super::CreateNamedStruct;
116    use arrow::array::{Array, DictionaryArray, Int32Array, RecordBatch, StringArray};
117    use arrow::datatypes::{DataType, Field, Schema};
118    use datafusion::common::Result;
119    use datafusion::physical_expr::expressions::Column;
120    use datafusion::physical_expr::PhysicalExpr;
121    use datafusion::physical_plan::ColumnarValue;
122    use std::sync::Arc;
123
124    #[test]
125    fn test_create_struct_from_dict_encoded_i32() -> Result<()> {
126        let keys = Int32Array::from(vec![0, 1, 2]);
127        let values = Int32Array::from(vec![0, 111, 233]);
128        let dict = DictionaryArray::try_new(keys, Arc::new(values))?;
129        let data_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int32));
130        let schema = Schema::new(vec![Field::new("a", data_type, false)]);
131        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(dict)])?;
132        let field_names = vec!["a".to_string()];
133        let x = CreateNamedStruct::new(vec![Arc::new(Column::new("a", 0))], field_names);
134        let ColumnarValue::Array(x) = x.evaluate(&batch)? else {
135            unreachable!()
136        };
137        assert_eq!(3, x.len());
138        Ok(())
139    }
140
141    #[test]
142    fn test_create_struct_from_dict_encoded_string() -> Result<()> {
143        let keys = Int32Array::from(vec![0, 1, 2]);
144        let values = StringArray::from(vec!["a".to_string(), "b".to_string(), "c".to_string()]);
145        let dict = DictionaryArray::try_new(keys, Arc::new(values))?;
146        let data_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
147        let schema = Schema::new(vec![Field::new("a", data_type, false)]);
148        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(dict)])?;
149        let field_names = vec!["a".to_string()];
150        let x = CreateNamedStruct::new(vec![Arc::new(Column::new("a", 0))], field_names);
151        let ColumnarValue::Array(x) = x.evaluate(&batch)? else {
152            unreachable!()
153        };
154        assert_eq!(3, x.len());
155        Ok(())
156    }
157}