datafusion_spark/function/array/
spark_array.rs1use std::{any::Any, sync::Arc};
19
20use arrow::array::{Array, ArrayRef, new_null_array};
21use arrow::datatypes::{DataType, Field, FieldRef};
22use datafusion_common::utils::SingleRowListArrayBuilder;
23use datafusion_common::{Result, internal_err};
24use datafusion_expr::{
25 ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature,
26 Volatility,
27};
28use datafusion_functions_nested::make_array::{array_array, coerce_types_inner};
29
30use crate::function::functions_nested_utils::make_scalar_function;
31
32const ARRAY_FIELD_DEFAULT_NAME: &str = "element";
33
34#[derive(Debug, PartialEq, Eq, Hash)]
35pub struct SparkArray {
36 signature: Signature,
37}
38
39impl Default for SparkArray {
40 fn default() -> Self {
41 Self::new()
42 }
43}
44
45impl SparkArray {
46 pub fn new() -> Self {
47 Self {
48 signature: Signature::user_defined(Volatility::Immutable),
49 }
50 }
51}
52
53impl ScalarUDFImpl for SparkArray {
54 fn as_any(&self) -> &dyn Any {
55 self
56 }
57
58 fn name(&self) -> &str {
59 "array"
60 }
61
62 fn signature(&self) -> &Signature {
63 &self.signature
64 }
65
66 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
67 internal_err!("return_field_from_args should be used instead")
68 }
69
70 fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<FieldRef> {
71 let data_types = args
72 .arg_fields
73 .iter()
74 .map(|f| f.data_type())
75 .cloned()
76 .collect::<Vec<_>>();
77
78 let mut expr_type = DataType::Null;
79 for arg_type in &data_types {
80 if !arg_type.equals_datatype(&DataType::Null) {
81 expr_type = arg_type.clone();
82 break;
83 }
84 }
85
86 let return_type = DataType::List(Arc::new(Field::new(
87 ARRAY_FIELD_DEFAULT_NAME,
88 expr_type,
89 true,
90 )));
91
92 Ok(Arc::new(Field::new(
93 "this_field_name_is_irrelevant",
94 return_type,
95 false,
96 )))
97 }
98
99 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
100 let ScalarFunctionArgs { args, .. } = args;
101 make_scalar_function(make_array_inner)(args.as_slice())
102 }
103
104 fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
105 if arg_types.is_empty() {
106 Ok(vec![])
107 } else {
108 coerce_types_inner(arg_types, self.name())
109 }
110 }
111}
112
113pub fn make_array_inner(arrays: &[ArrayRef]) -> Result<ArrayRef> {
117 let mut data_type = DataType::Null;
118 for arg in arrays {
119 let arg_data_type = arg.data_type();
120 if !arg_data_type.equals_datatype(&DataType::Null) {
121 data_type = arg_data_type.clone();
122 break;
123 }
124 }
125
126 match data_type {
127 DataType::Null => {
129 let length = arrays.iter().map(|a| a.len()).sum();
130 let array = new_null_array(&DataType::Null, length);
132 Ok(Arc::new(
133 SingleRowListArrayBuilder::new(array)
134 .with_nullable(true)
135 .with_field_name(Some(ARRAY_FIELD_DEFAULT_NAME.to_string()))
136 .build_list_array(),
137 ))
138 }
139 _ => array_array::<i32>(arrays, data_type, ARRAY_FIELD_DEFAULT_NAME),
140 }
141}