datafusion_spark/function/array/
spark_array.rs1use std::sync::Arc;
19
20use arrow::array::{Array, ArrayRef, new_null_array};
21use arrow::datatypes::{DataType, Field, FieldRef};
22use datafusion_common::utils::SingleRowListArrayBuilder;
23use datafusion_common::{Result, internal_err};
24use datafusion_expr::{
25 ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature,
26 Volatility,
27};
28use datafusion_functions_nested::make_array::{array_array, coerce_types_inner};
29
30use crate::function::functions_nested_utils::make_scalar_function;
31
32const ARRAY_FIELD_DEFAULT_NAME: &str = "element";
33
34#[derive(Debug, PartialEq, Eq, Hash)]
35pub struct SparkArray {
36 signature: Signature,
37}
38
39impl Default for SparkArray {
40 fn default() -> Self {
41 Self::new()
42 }
43}
44
45impl SparkArray {
46 pub fn new() -> Self {
47 Self {
48 signature: Signature::user_defined(Volatility::Immutable),
49 }
50 }
51}
52
53impl ScalarUDFImpl for SparkArray {
54 fn name(&self) -> &str {
55 "array"
56 }
57
58 fn signature(&self) -> &Signature {
59 &self.signature
60 }
61
62 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
63 internal_err!("return_field_from_args should be used instead")
64 }
65
66 fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<FieldRef> {
67 let data_types = args
68 .arg_fields
69 .iter()
70 .map(|f| f.data_type())
71 .cloned()
72 .collect::<Vec<_>>();
73
74 let mut expr_type = DataType::Null;
75 for arg_type in &data_types {
76 if !arg_type.equals_datatype(&DataType::Null) {
77 expr_type = arg_type.clone();
78 break;
79 }
80 }
81
82 let return_type = DataType::List(Arc::new(Field::new(
83 ARRAY_FIELD_DEFAULT_NAME,
84 expr_type,
85 true,
86 )));
87
88 Ok(Arc::new(Field::new(
89 "this_field_name_is_irrelevant",
90 return_type,
91 false,
92 )))
93 }
94
95 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
96 let ScalarFunctionArgs { args, .. } = args;
97 make_scalar_function(make_array_inner)(args.as_slice())
98 }
99
100 fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
101 if arg_types.is_empty() {
102 Ok(vec![])
103 } else {
104 coerce_types_inner(arg_types, self.name())
105 }
106 }
107}
108
109pub fn make_array_inner(arrays: &[ArrayRef]) -> Result<ArrayRef> {
113 let mut data_type = DataType::Null;
114 for arg in arrays {
115 let arg_data_type = arg.data_type();
116 if !arg_data_type.equals_datatype(&DataType::Null) {
117 data_type = arg_data_type.clone();
118 break;
119 }
120 }
121
122 match data_type {
123 DataType::Null => {
125 let length = arrays.iter().map(|a| a.len()).sum();
126 let array = new_null_array(&DataType::Null, length);
128 Ok(Arc::new(
129 SingleRowListArrayBuilder::new(array)
130 .with_nullable(true)
131 .with_field_name(Some(ARRAY_FIELD_DEFAULT_NAME.to_string()))
132 .build_list_array(),
133 ))
134 }
135 _ => array_array::<i32>(arrays, data_type, ARRAY_FIELD_DEFAULT_NAME),
136 }
137}