Skip to main content

datafusion_spark/function/array/
spark_array.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::{any::Any, sync::Arc};
19
20use arrow::array::{Array, ArrayRef, new_null_array};
21use arrow::datatypes::{DataType, Field, FieldRef};
22use datafusion_common::utils::SingleRowListArrayBuilder;
23use datafusion_common::{Result, internal_err};
24use datafusion_expr::{
25    ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature,
26    Volatility,
27};
28use datafusion_functions_nested::make_array::{array_array, coerce_types_inner};
29
30use crate::function::functions_nested_utils::make_scalar_function;
31
32const ARRAY_FIELD_DEFAULT_NAME: &str = "element";
33
34#[derive(Debug, PartialEq, Eq, Hash)]
35pub struct SparkArray {
36    signature: Signature,
37}
38
39impl Default for SparkArray {
40    fn default() -> Self {
41        Self::new()
42    }
43}
44
45impl SparkArray {
46    pub fn new() -> Self {
47        Self {
48            signature: Signature::user_defined(Volatility::Immutable),
49        }
50    }
51}
52
53impl ScalarUDFImpl for SparkArray {
54    fn as_any(&self) -> &dyn Any {
55        self
56    }
57
58    fn name(&self) -> &str {
59        "array"
60    }
61
62    fn signature(&self) -> &Signature {
63        &self.signature
64    }
65
66    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
67        internal_err!("return_field_from_args should be used instead")
68    }
69
70    fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<FieldRef> {
71        let data_types = args
72            .arg_fields
73            .iter()
74            .map(|f| f.data_type())
75            .cloned()
76            .collect::<Vec<_>>();
77
78        let mut expr_type = DataType::Null;
79        for arg_type in &data_types {
80            if !arg_type.equals_datatype(&DataType::Null) {
81                expr_type = arg_type.clone();
82                break;
83            }
84        }
85
86        let return_type = DataType::List(Arc::new(Field::new(
87            ARRAY_FIELD_DEFAULT_NAME,
88            expr_type,
89            true,
90        )));
91
92        Ok(Arc::new(Field::new(
93            "this_field_name_is_irrelevant",
94            return_type,
95            false,
96        )))
97    }
98
99    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
100        let ScalarFunctionArgs { args, .. } = args;
101        make_scalar_function(make_array_inner)(args.as_slice())
102    }
103
104    fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
105        if arg_types.is_empty() {
106            Ok(vec![])
107        } else {
108            coerce_types_inner(arg_types, self.name())
109        }
110    }
111}
112
113/// `make_array_inner` is the implementation of the `make_array` function.
114/// Constructs an array using the input `data` as `ArrayRef`.
115/// Returns a reference-counted `Array` instance result.
116pub fn make_array_inner(arrays: &[ArrayRef]) -> Result<ArrayRef> {
117    let mut data_type = DataType::Null;
118    for arg in arrays {
119        let arg_data_type = arg.data_type();
120        if !arg_data_type.equals_datatype(&DataType::Null) {
121            data_type = arg_data_type.clone();
122            break;
123        }
124    }
125
126    match data_type {
127        // Either an empty array or all nulls:
128        DataType::Null => {
129            let length = arrays.iter().map(|a| a.len()).sum();
130            // By default Int32
131            let array = new_null_array(&DataType::Null, length);
132            Ok(Arc::new(
133                SingleRowListArrayBuilder::new(array)
134                    .with_nullable(true)
135                    .with_field_name(Some(ARRAY_FIELD_DEFAULT_NAME.to_string()))
136                    .build_list_array(),
137            ))
138        }
139        _ => array_array::<i32>(arrays, data_type, ARRAY_FIELD_DEFAULT_NAME),
140    }
141}