Skip to main content

datafusion_spark/function/array/
spark_array.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use arrow::array::{Array, ArrayRef, new_null_array};
21use arrow::datatypes::{DataType, Field, FieldRef};
22use datafusion_common::utils::SingleRowListArrayBuilder;
23use datafusion_common::{Result, internal_err};
24use datafusion_expr::{
25    ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature,
26    Volatility,
27};
28use datafusion_functions_nested::make_array::{array_array, coerce_types_inner};
29
30use crate::function::functions_nested_utils::make_scalar_function;
31
32const ARRAY_FIELD_DEFAULT_NAME: &str = "element";
33
34#[derive(Debug, PartialEq, Eq, Hash)]
35pub struct SparkArray {
36    signature: Signature,
37}
38
39impl Default for SparkArray {
40    fn default() -> Self {
41        Self::new()
42    }
43}
44
45impl SparkArray {
46    pub fn new() -> Self {
47        Self {
48            signature: Signature::user_defined(Volatility::Immutable),
49        }
50    }
51}
52
53impl ScalarUDFImpl for SparkArray {
54    fn name(&self) -> &str {
55        "array"
56    }
57
58    fn signature(&self) -> &Signature {
59        &self.signature
60    }
61
62    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
63        internal_err!("return_field_from_args should be used instead")
64    }
65
66    fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<FieldRef> {
67        let data_types = args
68            .arg_fields
69            .iter()
70            .map(|f| f.data_type())
71            .cloned()
72            .collect::<Vec<_>>();
73
74        let mut expr_type = DataType::Null;
75        for arg_type in &data_types {
76            if !arg_type.equals_datatype(&DataType::Null) {
77                expr_type = arg_type.clone();
78                break;
79            }
80        }
81
82        let return_type = DataType::List(Arc::new(Field::new(
83            ARRAY_FIELD_DEFAULT_NAME,
84            expr_type,
85            true,
86        )));
87
88        Ok(Arc::new(Field::new(
89            "this_field_name_is_irrelevant",
90            return_type,
91            false,
92        )))
93    }
94
95    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
96        let ScalarFunctionArgs { args, .. } = args;
97        make_scalar_function(make_array_inner)(args.as_slice())
98    }
99
100    fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
101        if arg_types.is_empty() {
102            Ok(vec![])
103        } else {
104            coerce_types_inner(arg_types, self.name())
105        }
106    }
107}
108
109/// `make_array_inner` is the implementation of the `make_array` function.
110/// Constructs an array using the input `data` as `ArrayRef`.
111/// Returns a reference-counted `Array` instance result.
112pub fn make_array_inner(arrays: &[ArrayRef]) -> Result<ArrayRef> {
113    let mut data_type = DataType::Null;
114    for arg in arrays {
115        let arg_data_type = arg.data_type();
116        if !arg_data_type.equals_datatype(&DataType::Null) {
117            data_type = arg_data_type.clone();
118            break;
119        }
120    }
121
122    match data_type {
123        // Either an empty array or all nulls:
124        DataType::Null => {
125            let length = arrays.iter().map(|a| a.len()).sum();
126            // By default Int32
127            let array = new_null_array(&DataType::Null, length);
128            Ok(Arc::new(
129                SingleRowListArrayBuilder::new(array)
130                    .with_nullable(true)
131                    .with_field_name(Some(ARRAY_FIELD_DEFAULT_NAME.to_string()))
132                    .build_list_array(),
133            ))
134        }
135        _ => array_array::<i32>(arrays, data_type, ARRAY_FIELD_DEFAULT_NAME),
136    }
137}