datafusion_comet_spark_expr/struct_funcs/
create_named_struct.rs1use arrow::array::StructArray;
19use arrow::datatypes::{DataType, Field, Schema};
20use arrow::record_batch::RecordBatch;
21use datafusion::common::Result as DataFusionResult;
22use datafusion::logical_expr::ColumnarValue;
23use datafusion::physical_expr::PhysicalExpr;
24use std::{
25 any::Any,
26 fmt::{Display, Formatter},
27 hash::Hash,
28 sync::Arc,
29};
30
31#[derive(Debug, Hash, PartialEq, Eq)]
32pub struct CreateNamedStruct {
33 values: Vec<Arc<dyn PhysicalExpr>>,
34 names: Vec<String>,
35}
36
37impl CreateNamedStruct {
38 pub fn new(values: Vec<Arc<dyn PhysicalExpr>>, names: Vec<String>) -> Self {
39 Self { values, names }
40 }
41
42 fn fields(&self, schema: &Schema) -> DataFusionResult<Vec<Field>> {
43 self.values
44 .iter()
45 .zip(&self.names)
46 .map(|(expr, name)| {
47 let data_type = expr.data_type(schema)?;
48 let nullable = expr.nullable(schema)?;
49 Ok(Field::new(name, data_type, nullable))
50 })
51 .collect()
52 }
53}
54
55impl PhysicalExpr for CreateNamedStruct {
56 fn as_any(&self) -> &dyn Any {
57 self
58 }
59
60 fn fmt_sql(&self, _: &mut Formatter<'_>) -> std::fmt::Result {
61 unimplemented!()
62 }
63
64 fn data_type(&self, input_schema: &Schema) -> DataFusionResult<DataType> {
65 let fields = self.fields(input_schema)?;
66 Ok(DataType::Struct(fields.into()))
67 }
68
69 fn nullable(&self, _input_schema: &Schema) -> DataFusionResult<bool> {
70 Ok(false)
71 }
72
73 fn evaluate(&self, batch: &RecordBatch) -> DataFusionResult<ColumnarValue> {
74 let values = self
75 .values
76 .iter()
77 .map(|expr| expr.evaluate(batch))
78 .collect::<datafusion::common::Result<Vec<_>>>()?;
79 let arrays = ColumnarValue::values_to_arrays(&values)?;
80 let fields = self.fields(&batch.schema())?;
81 Ok(ColumnarValue::Array(Arc::new(StructArray::new(
82 fields.into(),
83 arrays,
84 None,
85 ))))
86 }
87
88 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
89 self.values.iter().collect()
90 }
91
92 fn with_new_children(
93 self: Arc<Self>,
94 children: Vec<Arc<dyn PhysicalExpr>>,
95 ) -> datafusion::common::Result<Arc<dyn PhysicalExpr>> {
96 Ok(Arc::new(CreateNamedStruct::new(
97 children.clone(),
98 self.names.clone(),
99 )))
100 }
101}
102
103impl Display for CreateNamedStruct {
104 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
105 write!(
106 f,
107 "CreateNamedStruct [values: {:?}, names: {:?}]",
108 self.values, self.names
109 )
110 }
111}
112
113#[cfg(test)]
114mod test {
115 use super::CreateNamedStruct;
116 use arrow::array::{Array, DictionaryArray, Int32Array, RecordBatch, StringArray};
117 use arrow::datatypes::{DataType, Field, Schema};
118 use datafusion::common::Result;
119 use datafusion::physical_expr::expressions::Column;
120 use datafusion::physical_expr::PhysicalExpr;
121 use datafusion::physical_plan::ColumnarValue;
122 use std::sync::Arc;
123
124 #[test]
125 fn test_create_struct_from_dict_encoded_i32() -> Result<()> {
126 let keys = Int32Array::from(vec![0, 1, 2]);
127 let values = Int32Array::from(vec![0, 111, 233]);
128 let dict = DictionaryArray::try_new(keys, Arc::new(values))?;
129 let data_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int32));
130 let schema = Schema::new(vec![Field::new("a", data_type, false)]);
131 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(dict)])?;
132 let field_names = vec!["a".to_string()];
133 let x = CreateNamedStruct::new(vec![Arc::new(Column::new("a", 0))], field_names);
134 let ColumnarValue::Array(x) = x.evaluate(&batch)? else {
135 unreachable!()
136 };
137 assert_eq!(3, x.len());
138 Ok(())
139 }
140
141 #[test]
142 fn test_create_struct_from_dict_encoded_string() -> Result<()> {
143 let keys = Int32Array::from(vec![0, 1, 2]);
144 let values = StringArray::from(vec!["a".to_string(), "b".to_string(), "c".to_string()]);
145 let dict = DictionaryArray::try_new(keys, Arc::new(values))?;
146 let data_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
147 let schema = Schema::new(vec![Field::new("a", data_type, false)]);
148 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(dict)])?;
149 let field_names = vec!["a".to_string()];
150 let x = CreateNamedStruct::new(vec![Arc::new(Column::new("a", 0))], field_names);
151 let ColumnarValue::Array(x) = x.evaluate(&batch)? else {
152 unreachable!()
153 };
154 assert_eq!(3, x.len());
155 Ok(())
156 }
157}