Skip to main content

datafusion_spark/function/collection/
size.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow::array::{Array, ArrayRef, AsArray, Int32Array};
19use arrow::compute::kernels::length::length as arrow_length;
20use arrow::datatypes::{DataType, Field, FieldRef};
21use datafusion_common::{Result, plan_err};
22use datafusion_expr::{
23    ArrayFunctionArgument, ArrayFunctionSignature, ColumnarValue, ReturnFieldArgs,
24    ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility,
25};
26use datafusion_functions::utils::make_scalar_function;
27use std::any::Any;
28use std::sync::Arc;
29
30/// Spark-compatible `size` function.
31///
32/// Returns the number of elements in an array or the number of key-value pairs in a map.
33/// Returns -1 for null input (Spark behavior).
34#[derive(Debug, PartialEq, Eq, Hash)]
35pub struct SparkSize {
36    signature: Signature,
37}
38
39impl Default for SparkSize {
40    fn default() -> Self {
41        Self::new()
42    }
43}
44
45impl SparkSize {
46    pub fn new() -> Self {
47        Self {
48            signature: Signature::one_of(
49                vec![
50                    // Array Type
51                    TypeSignature::ArraySignature(ArrayFunctionSignature::Array {
52                        arguments: vec![ArrayFunctionArgument::Array],
53                        array_coercion: None,
54                    }),
55                    // Map Type
56                    TypeSignature::ArraySignature(ArrayFunctionSignature::MapArray),
57                ],
58                Volatility::Immutable,
59            ),
60        }
61    }
62}
63
64impl ScalarUDFImpl for SparkSize {
65    fn as_any(&self) -> &dyn Any {
66        self
67    }
68
69    fn name(&self) -> &str {
70        "size"
71    }
72
73    fn signature(&self) -> &Signature {
74        &self.signature
75    }
76
77    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
78        Ok(DataType::Int32)
79    }
80
81    fn return_field_from_args(&self, _args: ReturnFieldArgs) -> Result<FieldRef> {
82        // nullable=false for legacy behavior (NULL -> -1); set to input nullability for null-on-null
83        Ok(Arc::new(Field::new(self.name(), DataType::Int32, false)))
84    }
85
86    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
87        make_scalar_function(spark_size_inner, vec![])(&args.args)
88    }
89}
90
91fn spark_size_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
92    let array = &args[0];
93
94    match array.data_type() {
95        DataType::List(_) => {
96            if array.null_count() == 0 {
97                Ok(arrow_length(array)?)
98            } else {
99                let list_array = array.as_list::<i32>();
100                let lengths: Vec<i32> = list_array
101                    .offsets()
102                    .lengths()
103                    .enumerate()
104                    .map(|(i, len)| if array.is_null(i) { -1 } else { len as i32 })
105                    .collect();
106                Ok(Arc::new(Int32Array::from(lengths)))
107            }
108        }
109        DataType::FixedSizeList(_, size) => {
110            if array.null_count() == 0 {
111                Ok(arrow_length(array)?)
112            } else {
113                let length: Vec<i32> = (0..array.len())
114                    .map(|i| if array.is_null(i) { -1 } else { *size })
115                    .collect();
116                Ok(Arc::new(Int32Array::from(length)))
117            }
118        }
119        DataType::LargeList(_) => {
120            // Arrow length kernel returns Int64 for LargeList
121            let list_array = array.as_list::<i64>();
122            if array.null_count() == 0 {
123                let lengths: Vec<i32> = list_array
124                    .offsets()
125                    .lengths()
126                    .map(|len| len as i32)
127                    .collect();
128                Ok(Arc::new(Int32Array::from(lengths)))
129            } else {
130                let lengths: Vec<i32> = list_array
131                    .offsets()
132                    .lengths()
133                    .enumerate()
134                    .map(|(i, len)| if array.is_null(i) { -1 } else { len as i32 })
135                    .collect();
136                Ok(Arc::new(Int32Array::from(lengths)))
137            }
138        }
139        DataType::Map(_, _) => {
140            let map_array = array.as_map();
141            let length: Vec<i32> = if array.null_count() == 0 {
142                map_array
143                    .offsets()
144                    .lengths()
145                    .map(|len| len as i32)
146                    .collect()
147            } else {
148                map_array
149                    .offsets()
150                    .lengths()
151                    .enumerate()
152                    .map(|(i, len)| if array.is_null(i) { -1 } else { len as i32 })
153                    .collect()
154            };
155            Ok(Arc::new(Int32Array::from(length)))
156        }
157        DataType::Null => Ok(Arc::new(Int32Array::from(vec![-1; array.len()]))),
158        dt => {
159            plan_err!("size function does not support type: {}", dt)
160        }
161    }
162}