datafusion_functions_nested/
extract.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ScalarUDFImpl`] definitions for array_element, array_slice, array_pop_front, array_pop_back, and array_any_value functions.
19
20use arrow::array::{
21    Array, ArrayRef, ArrowNativeTypeOp, Capacities, GenericListArray, Int64Array,
22    MutableArrayData, NullArray, NullBufferBuilder, OffsetSizeTrait,
23};
24use arrow::buffer::OffsetBuffer;
25use arrow::datatypes::DataType;
26use arrow::datatypes::{
27    DataType::{FixedSizeList, LargeList, List, Null},
28    Field,
29};
30use datafusion_common::cast::as_int64_array;
31use datafusion_common::cast::as_large_list_array;
32use datafusion_common::cast::as_list_array;
33use datafusion_common::utils::ListCoercion;
34use datafusion_common::{
35    exec_datafusion_err, exec_err, internal_datafusion_err, plan_err,
36    utils::take_function_args, Result,
37};
38use datafusion_expr::{
39    ArrayFunctionArgument, ArrayFunctionSignature, Expr, TypeSignature,
40};
41use datafusion_expr::{
42    ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
43};
44use datafusion_macros::user_doc;
45use std::any::Any;
46use std::sync::Arc;
47
48use crate::utils::make_scalar_function;
49
50// Create static instances of ScalarUDFs for each function
51make_udf_expr_and_func!(
52    ArrayElement,
53    array_element,
54    array element,
55    "extracts the element with the index n from the array.",
56    array_element_udf
57);
58
59create_func!(ArraySlice, array_slice_udf);
60
61make_udf_expr_and_func!(
62    ArrayPopFront,
63    array_pop_front,
64    array,
65    "returns the array without the first element.",
66    array_pop_front_udf
67);
68
69make_udf_expr_and_func!(
70    ArrayPopBack,
71    array_pop_back,
72    array,
73    "returns the array without the last element.",
74    array_pop_back_udf
75);
76
77make_udf_expr_and_func!(
78    ArrayAnyValue,
79    array_any_value,
80    array,
81    "returns the first non-null element in the array.",
82    array_any_value_udf
83);
84
85#[user_doc(
86    doc_section(label = "Array Functions"),
87    description = "Extracts the element with the index n from the array.",
88    syntax_example = "array_element(array, index)",
89    sql_example = r#"```sql
90> select array_element([1, 2, 3, 4], 3);
91+-----------------------------------------+
92| array_element(List([1,2,3,4]),Int64(3)) |
93+-----------------------------------------+
94| 3                                       |
95+-----------------------------------------+
96```"#,
97    argument(
98        name = "array",
99        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
100    ),
101    argument(
102        name = "index",
103        description = "Index to extract the element from the array."
104    )
105)]
106#[derive(Debug, PartialEq, Eq, Hash)]
107pub struct ArrayElement {
108    signature: Signature,
109    aliases: Vec<String>,
110}
111
112impl Default for ArrayElement {
113    fn default() -> Self {
114        Self::new()
115    }
116}
117
118impl ArrayElement {
119    pub fn new() -> Self {
120        Self {
121            signature: Signature::array_and_index(Volatility::Immutable),
122            aliases: vec![
123                String::from("array_extract"),
124                String::from("list_element"),
125                String::from("list_extract"),
126            ],
127        }
128    }
129}
130
131impl ScalarUDFImpl for ArrayElement {
132    fn as_any(&self) -> &dyn Any {
133        self
134    }
135    fn name(&self) -> &str {
136        "array_element"
137    }
138
139    fn display_name(&self, args: &[Expr]) -> Result<String> {
140        let args_name = args.iter().map(ToString::to_string).collect::<Vec<_>>();
141        if args_name.len() != 2 {
142            return exec_err!("expect 2 args, got {}", args_name.len());
143        }
144
145        Ok(format!("{}[{}]", args_name[0], args_name[1]))
146    }
147
148    fn schema_name(&self, args: &[Expr]) -> Result<String> {
149        let args_name = args
150            .iter()
151            .map(|e| e.schema_name().to_string())
152            .collect::<Vec<_>>();
153        if args_name.len() != 2 {
154            return exec_err!("expect 2 args, got {}", args_name.len());
155        }
156
157        Ok(format!("{}[{}]", args_name[0], args_name[1]))
158    }
159
160    fn signature(&self) -> &Signature {
161        &self.signature
162    }
163
164    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
165        match &arg_types[0] {
166            Null => Ok(Null),
167            List(field) | LargeList(field) => Ok(field.data_type().clone()),
168            arg_type => plan_err!("{} does not support type {arg_type}", self.name()),
169        }
170    }
171
172    fn invoke_with_args(
173        &self,
174        args: datafusion_expr::ScalarFunctionArgs,
175    ) -> Result<ColumnarValue> {
176        make_scalar_function(array_element_inner)(&args.args)
177    }
178
179    fn aliases(&self) -> &[String] {
180        &self.aliases
181    }
182
183    fn documentation(&self) -> Option<&Documentation> {
184        self.doc()
185    }
186}
187
188/// array_element SQL function
189///
190/// There are two arguments for array_element, the first one is the array, the second one is the 1-indexed index.
191/// `array_element(array, index)`
192///
193/// For example:
194/// > array_element(\[1, 2, 3], 2) -> 2
195fn array_element_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
196    let [array, indexes] = take_function_args("array_element", args)?;
197
198    match &array.data_type() {
199        Null => Ok(Arc::new(NullArray::new(array.len()))),
200        List(_) => {
201            let array = as_list_array(&array)?;
202            let indexes = as_int64_array(&indexes)?;
203            general_array_element::<i32>(array, indexes)
204        }
205        LargeList(_) => {
206            let array = as_large_list_array(&array)?;
207            let indexes = as_int64_array(&indexes)?;
208            general_array_element::<i64>(array, indexes)
209        }
210        arg_type => {
211            exec_err!("array_element does not support type {arg_type}")
212        }
213    }
214}
215
216fn general_array_element<O: OffsetSizeTrait>(
217    array: &GenericListArray<O>,
218    indexes: &Int64Array,
219) -> Result<ArrayRef>
220where
221    i64: TryInto<O>,
222{
223    let values = array.values();
224    if values.data_type().is_null() {
225        return Ok(Arc::new(NullArray::new(array.len())));
226    }
227
228    let original_data = values.to_data();
229    let capacity = Capacities::Array(original_data.len());
230
231    // use_nulls: true, we don't construct List for array_element, so we need explicit nulls.
232    let mut mutable =
233        MutableArrayData::with_capacities(vec![&original_data], true, capacity);
234
235    fn adjusted_array_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
236    where
237        i64: TryInto<O>,
238    {
239        let index: O = index.try_into().map_err(|_| {
240            exec_datafusion_err!("array_element got invalid index: {index}")
241        })?;
242        // 0 ~ len - 1
243        let adjusted_zero_index = if index < O::usize_as(0) {
244            index + len
245        } else {
246            index - O::usize_as(1)
247        };
248
249        if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
250            Ok(Some(adjusted_zero_index))
251        } else {
252            // Out of bounds
253            Ok(None)
254        }
255    }
256
257    for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
258        let start = offset_window[0];
259        let end = offset_window[1];
260        let len = end - start;
261
262        // array is null
263        if len == O::usize_as(0) {
264            mutable.extend_nulls(1);
265            continue;
266        }
267
268        let index = adjusted_array_index::<O>(indexes.value(row_index), len)?;
269
270        if let Some(index) = index {
271            let start = start.as_usize() + index.as_usize();
272            mutable.extend(0, start, start + 1_usize);
273        } else {
274            // Index out of bounds
275            mutable.extend_nulls(1);
276        }
277    }
278
279    let data = mutable.freeze();
280    Ok(arrow::array::make_array(data))
281}
282
283#[doc = "returns a slice of the array."]
284pub fn array_slice(array: Expr, begin: Expr, end: Expr, stride: Option<Expr>) -> Expr {
285    let args = match stride {
286        Some(stride) => vec![array, begin, end, stride],
287        None => vec![array, begin, end],
288    };
289    array_slice_udf().call(args)
290}
291
292#[user_doc(
293    doc_section(label = "Array Functions"),
294    description = "Returns a slice of the array based on 1-indexed start and end positions.",
295    syntax_example = "array_slice(array, begin, end)",
296    sql_example = r#"```sql
297> select array_slice([1, 2, 3, 4, 5, 6, 7, 8], 3, 6);
298+--------------------------------------------------------+
299| array_slice(List([1,2,3,4,5,6,7,8]),Int64(3),Int64(6)) |
300+--------------------------------------------------------+
301| [3, 4, 5, 6]                                           |
302+--------------------------------------------------------+
303```"#,
304    argument(
305        name = "array",
306        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
307    ),
308    argument(
309        name = "begin",
310        description = "Index of the first element. If negative, it counts backward from the end of the array."
311    ),
312    argument(
313        name = "end",
314        description = "Index of the last element. If negative, it counts backward from the end of the array."
315    ),
316    argument(
317        name = "stride",
318        description = "Stride of the array slice. The default is 1."
319    )
320)]
321#[derive(Debug, PartialEq, Eq, Hash)]
322pub(super) struct ArraySlice {
323    signature: Signature,
324    aliases: Vec<String>,
325}
326
327impl ArraySlice {
328    pub fn new() -> Self {
329        Self {
330            signature: Signature::one_of(
331                vec![
332                    TypeSignature::ArraySignature(ArrayFunctionSignature::Array {
333                        arguments: vec![
334                            ArrayFunctionArgument::Array,
335                            ArrayFunctionArgument::Index,
336                            ArrayFunctionArgument::Index,
337                        ],
338                        array_coercion: Some(ListCoercion::FixedSizedListToList),
339                    }),
340                    TypeSignature::ArraySignature(ArrayFunctionSignature::Array {
341                        arguments: vec![
342                            ArrayFunctionArgument::Array,
343                            ArrayFunctionArgument::Index,
344                            ArrayFunctionArgument::Index,
345                            ArrayFunctionArgument::Index,
346                        ],
347                        array_coercion: Some(ListCoercion::FixedSizedListToList),
348                    }),
349                ],
350                Volatility::Immutable,
351            ),
352            aliases: vec![String::from("list_slice")],
353        }
354    }
355}
356
357impl ScalarUDFImpl for ArraySlice {
358    fn as_any(&self) -> &dyn Any {
359        self
360    }
361
362    fn display_name(&self, args: &[Expr]) -> Result<String> {
363        let args_name = args.iter().map(ToString::to_string).collect::<Vec<_>>();
364        if let Some((arr, indexes)) = args_name.split_first() {
365            Ok(format!("{arr}[{}]", indexes.join(":")))
366        } else {
367            exec_err!("no argument")
368        }
369    }
370
371    fn schema_name(&self, args: &[Expr]) -> Result<String> {
372        let args_name = args
373            .iter()
374            .map(|e| e.schema_name().to_string())
375            .collect::<Vec<_>>();
376        if let Some((arr, indexes)) = args_name.split_first() {
377            Ok(format!("{arr}[{}]", indexes.join(":")))
378        } else {
379            exec_err!("no argument")
380        }
381    }
382
383    fn name(&self) -> &str {
384        "array_slice"
385    }
386
387    fn signature(&self) -> &Signature {
388        &self.signature
389    }
390
391    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
392        Ok(arg_types[0].clone())
393    }
394
395    fn invoke_with_args(
396        &self,
397        args: datafusion_expr::ScalarFunctionArgs,
398    ) -> Result<ColumnarValue> {
399        make_scalar_function(array_slice_inner)(&args.args)
400    }
401
402    fn aliases(&self) -> &[String] {
403        &self.aliases
404    }
405
406    fn documentation(&self) -> Option<&Documentation> {
407        self.doc()
408    }
409}
410
411/// array_slice SQL function
412///
413/// We follow the behavior of array_slice in DuckDB
414/// Note that array_slice is 1-indexed. And there are two additional arguments `from` and `to` in array_slice.
415///
416/// > array_slice(array, from, to)
417///
418/// Positive index is treated as the index from the start of the array. If the
419/// `from` index is smaller than 1, it is treated as 1. If the `to` index is larger than the
420/// length of the array, it is treated as the length of the array.
421///
422/// Negative index is treated as the index from the end of the array. If the index
423/// is larger than the length of the array, it is NOT VALID, either in `from` or `to`.
424/// The `to` index is exclusive like python slice syntax.
425///
426/// See test cases in `array.slt` for more details.
427fn array_slice_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
428    let args_len = args.len();
429    if args_len != 3 && args_len != 4 {
430        return exec_err!("array_slice needs three or four arguments");
431    }
432
433    let stride = if args_len == 4 {
434        Some(as_int64_array(&args[3])?)
435    } else {
436        None
437    };
438
439    let from_array = as_int64_array(&args[1])?;
440    let to_array = as_int64_array(&args[2])?;
441
442    let array_data_type = args[0].data_type();
443    match array_data_type {
444        List(_) => {
445            let array = as_list_array(&args[0])?;
446            general_array_slice::<i32>(array, from_array, to_array, stride)
447        }
448        LargeList(_) => {
449            let array = as_large_list_array(&args[0])?;
450            general_array_slice::<i64>(array, from_array, to_array, stride)
451        }
452        _ => exec_err!("array_slice does not support type: {}", array_data_type),
453    }
454}
455
456fn general_array_slice<O: OffsetSizeTrait>(
457    array: &GenericListArray<O>,
458    from_array: &Int64Array,
459    to_array: &Int64Array,
460    stride: Option<&Int64Array>,
461) -> Result<ArrayRef>
462where
463    i64: TryInto<O>,
464{
465    let values = array.values();
466    let original_data = values.to_data();
467    let capacity = Capacities::Array(original_data.len());
468
469    let mut mutable =
470        MutableArrayData::with_capacities(vec![&original_data], true, capacity);
471
472    // We have the slice syntax compatible with DuckDB v0.8.1.
473    // The rule `adjusted_from_index` and `adjusted_to_index` follows the rule of array_slice in duckdb.
474
475    fn adjusted_from_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
476    where
477        i64: TryInto<O>,
478    {
479        // 0 ~ len - 1
480        let adjusted_zero_index = if index < 0 {
481            if let Ok(index) = index.try_into() {
482                // When index < 0 and -index > length, index is clamped to the beginning of the list.
483                // Otherwise, when index < 0, the index is counted from the end of the list.
484                //
485                // Note, we actually test the contrapositive, index < -length, because negating a
486                // negative will panic if the negative is equal to the smallest representable value
487                // while negating a positive is always safe.
488                if index < (O::zero() - O::one()) * len {
489                    O::zero()
490                } else {
491                    index + len
492                }
493            } else {
494                return exec_err!("array_slice got invalid index: {}", index);
495            }
496        } else {
497            // array_slice(arr, 1, to) is the same as array_slice(arr, 0, to)
498            if let Ok(index) = index.try_into() {
499                std::cmp::max(index - O::usize_as(1), O::usize_as(0))
500            } else {
501                return exec_err!("array_slice got invalid index: {}", index);
502            }
503        };
504
505        if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
506            Ok(Some(adjusted_zero_index))
507        } else {
508            // Out of bounds
509            Ok(None)
510        }
511    }
512
513    fn adjusted_to_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
514    where
515        i64: TryInto<O>,
516    {
517        // 0 ~ len - 1
518        let adjusted_zero_index = if index < 0 {
519            // array_slice in duckdb with negative to_index is python-like, so index itself is exclusive
520            if let Ok(index) = index.try_into() {
521                index + len
522            } else {
523                return exec_err!("array_slice got invalid index: {}", index);
524            }
525        } else {
526            // array_slice(arr, from, len + 1) is the same as array_slice(arr, from, len)
527            if let Ok(index) = index.try_into() {
528                std::cmp::min(index - O::usize_as(1), len - O::usize_as(1))
529            } else {
530                return exec_err!("array_slice got invalid index: {}", index);
531            }
532        };
533
534        if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
535            Ok(Some(adjusted_zero_index))
536        } else {
537            // Out of bounds
538            Ok(None)
539        }
540    }
541
542    let mut offsets = vec![O::usize_as(0)];
543    let mut null_builder = NullBufferBuilder::new(array.len());
544
545    for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
546        let start = offset_window[0];
547        let end = offset_window[1];
548        let len = end - start;
549
550        // If any input is null, return null.
551        if array.is_null(row_index)
552            || from_array.is_null(row_index)
553            || to_array.is_null(row_index)
554        {
555            mutable.extend_nulls(1);
556            offsets.push(offsets[row_index] + O::usize_as(1));
557            null_builder.append_null();
558            continue;
559        }
560        null_builder.append_non_null();
561
562        // Empty arrays always return an empty array.
563        if len == O::usize_as(0) {
564            offsets.push(offsets[row_index]);
565            continue;
566        }
567
568        let from_index = adjusted_from_index::<O>(from_array.value(row_index), len)?;
569        let to_index = adjusted_to_index::<O>(to_array.value(row_index), len)?;
570
571        if let (Some(from), Some(to)) = (from_index, to_index) {
572            let stride = stride.map(|s| s.value(row_index));
573            // Default stride is 1 if not provided
574            let stride = stride.unwrap_or(1);
575            if stride.is_zero() {
576                return exec_err!(
577                    "array_slice got invalid stride: {:?}, it cannot be 0",
578                    stride
579                );
580            } else if (from < to && stride.is_negative())
581                || (from > to && stride.is_positive())
582            {
583                // return empty array
584                offsets.push(offsets[row_index]);
585                continue;
586            }
587
588            let stride: O = stride.try_into().map_err(|_| {
589                internal_datafusion_err!("array_slice got invalid stride: {}", stride)
590            })?;
591
592            if from <= to && stride > O::zero() {
593                assert!(start + to <= end);
594                if stride.eq(&O::one()) {
595                    // stride is default to 1
596                    mutable.extend(
597                        0,
598                        (start + from).to_usize().unwrap(),
599                        (start + to + O::usize_as(1)).to_usize().unwrap(),
600                    );
601                    offsets.push(offsets[row_index] + (to - from + O::usize_as(1)));
602                    continue;
603                }
604                let mut index = start + from;
605                let mut cnt = 0;
606                while index <= start + to {
607                    mutable.extend(
608                        0,
609                        index.to_usize().unwrap(),
610                        index.to_usize().unwrap() + 1,
611                    );
612                    index += stride;
613                    cnt += 1;
614                }
615                offsets.push(offsets[row_index] + O::usize_as(cnt));
616            } else {
617                let mut index = start + from;
618                let mut cnt = 0;
619                while index >= start + to {
620                    mutable.extend(
621                        0,
622                        index.to_usize().unwrap(),
623                        index.to_usize().unwrap() + 1,
624                    );
625                    index += stride;
626                    cnt += 1;
627                }
628                // invalid range, return empty array
629                offsets.push(offsets[row_index] + O::usize_as(cnt));
630            }
631        } else {
632            // invalid range, return empty array
633            offsets.push(offsets[row_index]);
634        }
635    }
636
637    let data = mutable.freeze();
638
639    Ok(Arc::new(GenericListArray::<O>::try_new(
640        Arc::new(Field::new_list_field(array.value_type(), true)),
641        OffsetBuffer::<O>::new(offsets.into()),
642        arrow::array::make_array(data),
643        null_builder.finish(),
644    )?))
645}
646
647#[user_doc(
648    doc_section(label = "Array Functions"),
649    description = "Returns the array without the first element.",
650    syntax_example = "array_pop_front(array)",
651    sql_example = r#"```sql
652> select array_pop_front([1, 2, 3]);
653+-------------------------------+
654| array_pop_front(List([1,2,3])) |
655+-------------------------------+
656| [2, 3]                        |
657+-------------------------------+
658```"#,
659    argument(
660        name = "array",
661        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
662    )
663)]
664#[derive(Debug, PartialEq, Eq, Hash)]
665pub(super) struct ArrayPopFront {
666    signature: Signature,
667    aliases: Vec<String>,
668}
669
670impl ArrayPopFront {
671    pub fn new() -> Self {
672        Self {
673            signature: Signature::array(Volatility::Immutable),
674            aliases: vec![String::from("list_pop_front")],
675        }
676    }
677}
678
679impl ScalarUDFImpl for ArrayPopFront {
680    fn as_any(&self) -> &dyn Any {
681        self
682    }
683    fn name(&self) -> &str {
684        "array_pop_front"
685    }
686
687    fn signature(&self) -> &Signature {
688        &self.signature
689    }
690
691    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
692        Ok(arg_types[0].clone())
693    }
694
695    fn invoke_with_args(
696        &self,
697        args: datafusion_expr::ScalarFunctionArgs,
698    ) -> Result<ColumnarValue> {
699        make_scalar_function(array_pop_front_inner)(&args.args)
700    }
701
702    fn aliases(&self) -> &[String] {
703        &self.aliases
704    }
705
706    fn documentation(&self) -> Option<&Documentation> {
707        self.doc()
708    }
709}
710
711/// array_pop_front SQL function
712fn array_pop_front_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
713    let array_data_type = args[0].data_type();
714    match array_data_type {
715        List(_) => {
716            let array = as_list_array(&args[0])?;
717            general_pop_front_list::<i32>(array)
718        }
719        LargeList(_) => {
720            let array = as_large_list_array(&args[0])?;
721            general_pop_front_list::<i64>(array)
722        }
723        _ => exec_err!("array_pop_front does not support type: {}", array_data_type),
724    }
725}
726
727fn general_pop_front_list<O: OffsetSizeTrait>(
728    array: &GenericListArray<O>,
729) -> Result<ArrayRef>
730where
731    i64: TryInto<O>,
732{
733    let from_array = Int64Array::from(vec![2; array.len()]);
734    let to_array = Int64Array::from(
735        array
736            .iter()
737            .map(|arr| arr.map_or(0, |arr| arr.len() as i64))
738            .collect::<Vec<i64>>(),
739    );
740    general_array_slice::<O>(array, &from_array, &to_array, None)
741}
742
743#[user_doc(
744    doc_section(label = "Array Functions"),
745    description = "Returns the array without the last element.",
746    syntax_example = "array_pop_back(array)",
747    sql_example = r#"```sql
748> select array_pop_back([1, 2, 3]);
749+-------------------------------+
750| array_pop_back(List([1,2,3])) |
751+-------------------------------+
752| [1, 2]                        |
753+-------------------------------+
754```"#,
755    argument(
756        name = "array",
757        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
758    )
759)]
760#[derive(Debug, PartialEq, Eq, Hash)]
761pub(super) struct ArrayPopBack {
762    signature: Signature,
763    aliases: Vec<String>,
764}
765
766impl ArrayPopBack {
767    pub fn new() -> Self {
768        Self {
769            signature: Signature::array(Volatility::Immutable),
770            aliases: vec![String::from("list_pop_back")],
771        }
772    }
773}
774
775impl ScalarUDFImpl for ArrayPopBack {
776    fn as_any(&self) -> &dyn Any {
777        self
778    }
779    fn name(&self) -> &str {
780        "array_pop_back"
781    }
782
783    fn signature(&self) -> &Signature {
784        &self.signature
785    }
786
787    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
788        Ok(arg_types[0].clone())
789    }
790
791    fn invoke_with_args(
792        &self,
793        args: datafusion_expr::ScalarFunctionArgs,
794    ) -> Result<ColumnarValue> {
795        make_scalar_function(array_pop_back_inner)(&args.args)
796    }
797
798    fn aliases(&self) -> &[String] {
799        &self.aliases
800    }
801
802    fn documentation(&self) -> Option<&Documentation> {
803        self.doc()
804    }
805}
806
807/// array_pop_back SQL function
808fn array_pop_back_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
809    let [array] = take_function_args("array_pop_back", args)?;
810
811    match array.data_type() {
812        List(_) => {
813            let array = as_list_array(&array)?;
814            general_pop_back_list::<i32>(array)
815        }
816        LargeList(_) => {
817            let array = as_large_list_array(&array)?;
818            general_pop_back_list::<i64>(array)
819        }
820        _ => exec_err!(
821            "array_pop_back does not support type: {}",
822            array.data_type()
823        ),
824    }
825}
826
827fn general_pop_back_list<O: OffsetSizeTrait>(
828    array: &GenericListArray<O>,
829) -> Result<ArrayRef>
830where
831    i64: TryInto<O>,
832{
833    let from_array = Int64Array::from(vec![1; array.len()]);
834    let to_array = Int64Array::from(
835        array
836            .iter()
837            .map(|arr| arr.map_or(0, |arr| arr.len() as i64 - 1))
838            .collect::<Vec<i64>>(),
839    );
840    general_array_slice::<O>(array, &from_array, &to_array, None)
841}
842
843#[user_doc(
844    doc_section(label = "Array Functions"),
845    description = "Returns the first non-null element in the array.",
846    syntax_example = "array_any_value(array)",
847    sql_example = r#"```sql
848> select array_any_value([NULL, 1, 2, 3]);
849+-------------------------------+
850| array_any_value(List([NULL,1,2,3])) |
851+-------------------------------------+
852| 1                                   |
853+-------------------------------------+
854```"#,
855    argument(
856        name = "array",
857        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
858    )
859)]
860#[derive(Debug, PartialEq, Eq, Hash)]
861pub(super) struct ArrayAnyValue {
862    signature: Signature,
863    aliases: Vec<String>,
864}
865
866impl ArrayAnyValue {
867    pub fn new() -> Self {
868        Self {
869            signature: Signature::array(Volatility::Immutable),
870            aliases: vec![String::from("list_any_value")],
871        }
872    }
873}
874
875impl ScalarUDFImpl for ArrayAnyValue {
876    fn as_any(&self) -> &dyn Any {
877        self
878    }
879    fn name(&self) -> &str {
880        "array_any_value"
881    }
882    fn signature(&self) -> &Signature {
883        &self.signature
884    }
885    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
886        match &arg_types[0] {
887            List(field)
888            | LargeList(field)
889            | FixedSizeList(field, _) => Ok(field.data_type().clone()),
890            _ => plan_err!(
891                "array_any_value can only accept List, LargeList or FixedSizeList as the argument"
892            ),
893        }
894    }
895
896    fn invoke_with_args(
897        &self,
898        args: datafusion_expr::ScalarFunctionArgs,
899    ) -> Result<ColumnarValue> {
900        make_scalar_function(array_any_value_inner)(&args.args)
901    }
902
903    fn aliases(&self) -> &[String] {
904        &self.aliases
905    }
906
907    fn documentation(&self) -> Option<&Documentation> {
908        self.doc()
909    }
910}
911
912fn array_any_value_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
913    let [array] = take_function_args("array_any_value", args)?;
914
915    match &array.data_type() {
916        List(_) => {
917            let array = as_list_array(&array)?;
918            general_array_any_value::<i32>(array)
919        }
920        LargeList(_) => {
921            let array = as_large_list_array(&array)?;
922            general_array_any_value::<i64>(array)
923        }
924        data_type => exec_err!("array_any_value does not support type: {data_type}"),
925    }
926}
927
928fn general_array_any_value<O: OffsetSizeTrait>(
929    array: &GenericListArray<O>,
930) -> Result<ArrayRef>
931where
932    i64: TryInto<O>,
933{
934    let values = array.values();
935    let original_data = values.to_data();
936    let capacity = Capacities::Array(array.len());
937
938    let mut mutable =
939        MutableArrayData::with_capacities(vec![&original_data], true, capacity);
940
941    for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
942        let start = offset_window[0];
943        let end = offset_window[1];
944        let len = end - start;
945
946        // array is null
947        if len == O::usize_as(0) {
948            mutable.extend_nulls(1);
949            continue;
950        }
951
952        let row_value = array.value(row_index);
953        match row_value.nulls() {
954            Some(row_nulls_buffer) => {
955                // nulls are present in the array so try to take the first valid element
956                if let Some(first_non_null_index) =
957                    row_nulls_buffer.valid_indices().next()
958                {
959                    let index = start.as_usize() + first_non_null_index;
960                    mutable.extend(0, index, index + 1)
961                } else {
962                    // all the elements in the array are null
963                    mutable.extend_nulls(1);
964                }
965            }
966            None => {
967                // no nulls are present in the array so take the first element
968                let index = start.as_usize();
969                mutable.extend(0, index, index + 1);
970            }
971        }
972    }
973
974    let data = mutable.freeze();
975    Ok(arrow::array::make_array(data))
976}
977
978#[cfg(test)]
979mod tests {
980    use super::array_element_udf;
981    use arrow::datatypes::{DataType, Field};
982    use datafusion_common::{Column, DFSchema};
983    use datafusion_expr::expr::ScalarFunction;
984    use datafusion_expr::{Expr, ExprSchemable};
985    use std::collections::HashMap;
986
987    // Regression test for https://github.com/apache/datafusion/issues/13755
988    #[test]
989    fn test_array_element_return_type_fixed_size_list() {
990        let fixed_size_list_type = DataType::FixedSizeList(
991            Field::new("some_arbitrary_test_field", DataType::Int32, false).into(),
992            13,
993        );
994        let array_type = DataType::List(
995            Field::new_list_field(fixed_size_list_type.clone(), true).into(),
996        );
997        let index_type = DataType::Int64;
998
999        let schema = DFSchema::from_unqualified_fields(
1000            vec![
1001                Field::new("my_array", array_type.clone(), false),
1002                Field::new("my_index", index_type.clone(), false),
1003            ]
1004            .into(),
1005            HashMap::default(),
1006        )
1007        .unwrap();
1008
1009        let udf = array_element_udf();
1010
1011        // ScalarUDFImpl::return_type
1012        assert_eq!(
1013            udf.return_type(&[array_type.clone(), index_type.clone()])
1014                .unwrap(),
1015            fixed_size_list_type
1016        );
1017
1018        // Via ExprSchemable::get_type (e.g. SimplifyInfo)
1019        let udf_expr = Expr::ScalarFunction(ScalarFunction {
1020            func: array_element_udf(),
1021            args: vec![
1022                Expr::Column(Column::new_unqualified("my_array")),
1023                Expr::Column(Column::new_unqualified("my_index")),
1024            ],
1025        });
1026        assert_eq!(
1027            ExprSchemable::get_type(&udf_expr, &schema).unwrap(),
1028            fixed_size_list_type
1029        );
1030    }
1031}