datafusion_functions_nested/
extract.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ScalarUDFImpl`] definitions for array_element, array_slice, array_pop_front, array_pop_back, and array_any_value functions.
19
20use arrow::array::{
21    Array, ArrayRef, ArrowNativeTypeOp, Capacities, GenericListArray, Int64Array,
22    MutableArrayData, NullBufferBuilder, OffsetSizeTrait,
23};
24use arrow::buffer::OffsetBuffer;
25use arrow::datatypes::DataType;
26use arrow::datatypes::{
27    DataType::{FixedSizeList, LargeList, List},
28    Field,
29};
30use datafusion_common::cast::as_int64_array;
31use datafusion_common::cast::as_large_list_array;
32use datafusion_common::cast::as_list_array;
33use datafusion_common::utils::ListCoercion;
34use datafusion_common::{
35    exec_err, internal_datafusion_err, plan_err, utils::take_function_args,
36    DataFusionError, Result,
37};
38use datafusion_expr::{
39    ArrayFunctionArgument, ArrayFunctionSignature, Expr, TypeSignature,
40};
41use datafusion_expr::{
42    ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
43};
44use datafusion_macros::user_doc;
45use std::any::Any;
46use std::sync::Arc;
47
48use crate::utils::make_scalar_function;
49
50// Create static instances of ScalarUDFs for each function
51make_udf_expr_and_func!(
52    ArrayElement,
53    array_element,
54    array element,
55    "extracts the element with the index n from the array.",
56    array_element_udf
57);
58
59create_func!(ArraySlice, array_slice_udf);
60
61make_udf_expr_and_func!(
62    ArrayPopFront,
63    array_pop_front,
64    array,
65    "returns the array without the first element.",
66    array_pop_front_udf
67);
68
69make_udf_expr_and_func!(
70    ArrayPopBack,
71    array_pop_back,
72    array,
73    "returns the array without the last element.",
74    array_pop_back_udf
75);
76
77make_udf_expr_and_func!(
78    ArrayAnyValue,
79    array_any_value,
80    array,
81    "returns the first non-null element in the array.",
82    array_any_value_udf
83);
84
85#[user_doc(
86    doc_section(label = "Array Functions"),
87    description = "Extracts the element with the index n from the array.",
88    syntax_example = "array_element(array, index)",
89    sql_example = r#"```sql
90> select array_element([1, 2, 3, 4], 3);
91+-----------------------------------------+
92| array_element(List([1,2,3,4]),Int64(3)) |
93+-----------------------------------------+
94| 3                                       |
95+-----------------------------------------+
96```"#,
97    argument(
98        name = "array",
99        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
100    ),
101    argument(
102        name = "index",
103        description = "Index to extract the element from the array."
104    )
105)]
106#[derive(Debug)]
107pub struct ArrayElement {
108    signature: Signature,
109    aliases: Vec<String>,
110}
111
112impl Default for ArrayElement {
113    fn default() -> Self {
114        Self::new()
115    }
116}
117
118impl ArrayElement {
119    pub fn new() -> Self {
120        Self {
121            signature: Signature::array_and_index(Volatility::Immutable),
122            aliases: vec![
123                String::from("array_extract"),
124                String::from("list_element"),
125                String::from("list_extract"),
126            ],
127        }
128    }
129}
130
131impl ScalarUDFImpl for ArrayElement {
132    fn as_any(&self) -> &dyn Any {
133        self
134    }
135    fn name(&self) -> &str {
136        "array_element"
137    }
138
139    fn display_name(&self, args: &[Expr]) -> Result<String> {
140        let args_name = args.iter().map(ToString::to_string).collect::<Vec<_>>();
141        if args_name.len() != 2 {
142            return exec_err!("expect 2 args, got {}", args_name.len());
143        }
144
145        Ok(format!("{}[{}]", args_name[0], args_name[1]))
146    }
147
148    fn schema_name(&self, args: &[Expr]) -> Result<String> {
149        let args_name = args
150            .iter()
151            .map(|e| e.schema_name().to_string())
152            .collect::<Vec<_>>();
153        if args_name.len() != 2 {
154            return exec_err!("expect 2 args, got {}", args_name.len());
155        }
156
157        Ok(format!("{}[{}]", args_name[0], args_name[1]))
158    }
159
160    fn signature(&self) -> &Signature {
161        &self.signature
162    }
163
164    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
165        match &arg_types[0] {
166            List(field)
167            | LargeList(field)
168            | FixedSizeList(field, _) => Ok(field.data_type().clone()),
169            DataType::Null => Ok(List(Arc::new(Field::new_list_field(DataType::Int64, true)))),
170            _ => plan_err!(
171                "ArrayElement can only accept List, LargeList or FixedSizeList as the first argument"
172            ),
173        }
174    }
175
176    fn invoke_with_args(
177        &self,
178        args: datafusion_expr::ScalarFunctionArgs,
179    ) -> Result<ColumnarValue> {
180        make_scalar_function(array_element_inner)(&args.args)
181    }
182
183    fn aliases(&self) -> &[String] {
184        &self.aliases
185    }
186
187    fn documentation(&self) -> Option<&Documentation> {
188        self.doc()
189    }
190}
191
192/// array_element SQL function
193///
194/// There are two arguments for array_element, the first one is the array, the second one is the 1-indexed index.
195/// `array_element(array, index)`
196///
197/// For example:
198/// > array_element(\[1, 2, 3], 2) -> 2
199fn array_element_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
200    let [array, indexes] = take_function_args("array_element", args)?;
201
202    match &array.data_type() {
203        List(_) => {
204            let array = as_list_array(&array)?;
205            let indexes = as_int64_array(&indexes)?;
206            general_array_element::<i32>(array, indexes)
207        }
208        LargeList(_) => {
209            let array = as_large_list_array(&array)?;
210            let indexes = as_int64_array(&indexes)?;
211            general_array_element::<i64>(array, indexes)
212        }
213        _ => exec_err!(
214            "array_element does not support type: {:?}",
215            array.data_type()
216        ),
217    }
218}
219
220fn general_array_element<O: OffsetSizeTrait>(
221    array: &GenericListArray<O>,
222    indexes: &Int64Array,
223) -> Result<ArrayRef>
224where
225    i64: TryInto<O>,
226{
227    let values = array.values();
228    let original_data = values.to_data();
229    let capacity = Capacities::Array(original_data.len());
230
231    // use_nulls: true, we don't construct List for array_element, so we need explicit nulls.
232    let mut mutable =
233        MutableArrayData::with_capacities(vec![&original_data], true, capacity);
234
235    fn adjusted_array_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
236    where
237        i64: TryInto<O>,
238    {
239        let index: O = index.try_into().map_err(|_| {
240            DataFusionError::Execution(format!(
241                "array_element got invalid index: {}",
242                index
243            ))
244        })?;
245        // 0 ~ len - 1
246        let adjusted_zero_index = if index < O::usize_as(0) {
247            index + len
248        } else {
249            index - O::usize_as(1)
250        };
251
252        if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
253            Ok(Some(adjusted_zero_index))
254        } else {
255            // Out of bounds
256            Ok(None)
257        }
258    }
259
260    for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
261        let start = offset_window[0];
262        let end = offset_window[1];
263        let len = end - start;
264
265        // array is null
266        if len == O::usize_as(0) {
267            mutable.extend_nulls(1);
268            continue;
269        }
270
271        let index = adjusted_array_index::<O>(indexes.value(row_index), len)?;
272
273        if let Some(index) = index {
274            let start = start.as_usize() + index.as_usize();
275            mutable.extend(0, start, start + 1_usize);
276        } else {
277            // Index out of bounds
278            mutable.extend_nulls(1);
279        }
280    }
281
282    let data = mutable.freeze();
283    Ok(arrow::array::make_array(data))
284}
285
286#[doc = "returns a slice of the array."]
287pub fn array_slice(array: Expr, begin: Expr, end: Expr, stride: Option<Expr>) -> Expr {
288    let args = match stride {
289        Some(stride) => vec![array, begin, end, stride],
290        None => vec![array, begin, end],
291    };
292    array_slice_udf().call(args)
293}
294
295#[user_doc(
296    doc_section(label = "Array Functions"),
297    description = "Returns a slice of the array based on 1-indexed start and end positions.",
298    syntax_example = "array_slice(array, begin, end)",
299    sql_example = r#"```sql
300> select array_slice([1, 2, 3, 4, 5, 6, 7, 8], 3, 6);
301+--------------------------------------------------------+
302| array_slice(List([1,2,3,4,5,6,7,8]),Int64(3),Int64(6)) |
303+--------------------------------------------------------+
304| [3, 4, 5, 6]                                           |
305+--------------------------------------------------------+
306```"#,
307    argument(
308        name = "array",
309        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
310    ),
311    argument(
312        name = "begin",
313        description = "Index of the first element. If negative, it counts backward from the end of the array."
314    ),
315    argument(
316        name = "end",
317        description = "Index of the last element. If negative, it counts backward from the end of the array."
318    ),
319    argument(
320        name = "stride",
321        description = "Stride of the array slice. The default is 1."
322    )
323)]
324#[derive(Debug)]
325pub(super) struct ArraySlice {
326    signature: Signature,
327    aliases: Vec<String>,
328}
329
330impl ArraySlice {
331    pub fn new() -> Self {
332        Self {
333            signature: Signature::one_of(
334                vec![
335                    TypeSignature::ArraySignature(ArrayFunctionSignature::Array {
336                        arguments: vec![
337                            ArrayFunctionArgument::Array,
338                            ArrayFunctionArgument::Index,
339                            ArrayFunctionArgument::Index,
340                        ],
341                        array_coercion: None,
342                    }),
343                    TypeSignature::ArraySignature(ArrayFunctionSignature::Array {
344                        arguments: vec![
345                            ArrayFunctionArgument::Array,
346                            ArrayFunctionArgument::Index,
347                            ArrayFunctionArgument::Index,
348                            ArrayFunctionArgument::Index,
349                        ],
350                        array_coercion: None,
351                    }),
352                ],
353                Volatility::Immutable,
354            ),
355            aliases: vec![String::from("list_slice")],
356        }
357    }
358}
359
360impl ScalarUDFImpl for ArraySlice {
361    fn as_any(&self) -> &dyn Any {
362        self
363    }
364
365    fn display_name(&self, args: &[Expr]) -> Result<String> {
366        let args_name = args.iter().map(ToString::to_string).collect::<Vec<_>>();
367        if let Some((arr, indexes)) = args_name.split_first() {
368            Ok(format!("{arr}[{}]", indexes.join(":")))
369        } else {
370            exec_err!("no argument")
371        }
372    }
373
374    fn schema_name(&self, args: &[Expr]) -> Result<String> {
375        let args_name = args
376            .iter()
377            .map(|e| e.schema_name().to_string())
378            .collect::<Vec<_>>();
379        if let Some((arr, indexes)) = args_name.split_first() {
380            Ok(format!("{arr}[{}]", indexes.join(":")))
381        } else {
382            exec_err!("no argument")
383        }
384    }
385
386    fn name(&self) -> &str {
387        "array_slice"
388    }
389
390    fn signature(&self) -> &Signature {
391        &self.signature
392    }
393
394    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
395        Ok(arg_types[0].clone())
396    }
397
398    fn invoke_with_args(
399        &self,
400        args: datafusion_expr::ScalarFunctionArgs,
401    ) -> Result<ColumnarValue> {
402        make_scalar_function(array_slice_inner)(&args.args)
403    }
404
405    fn aliases(&self) -> &[String] {
406        &self.aliases
407    }
408
409    fn documentation(&self) -> Option<&Documentation> {
410        self.doc()
411    }
412}
413
414/// array_slice SQL function
415///
416/// We follow the behavior of array_slice in DuckDB
417/// Note that array_slice is 1-indexed. And there are two additional arguments `from` and `to` in array_slice.
418///
419/// > array_slice(array, from, to)
420///
421/// Positive index is treated as the index from the start of the array. If the
422/// `from` index is smaller than 1, it is treated as 1. If the `to` index is larger than the
423/// length of the array, it is treated as the length of the array.
424///
425/// Negative index is treated as the index from the end of the array. If the index
426/// is larger than the length of the array, it is NOT VALID, either in `from` or `to`.
427/// The `to` index is exclusive like python slice syntax.
428///
429/// See test cases in `array.slt` for more details.
430fn array_slice_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
431    let args_len = args.len();
432    if args_len != 3 && args_len != 4 {
433        return exec_err!("array_slice needs three or four arguments");
434    }
435
436    let stride = if args_len == 4 {
437        Some(as_int64_array(&args[3])?)
438    } else {
439        None
440    };
441
442    let from_array = as_int64_array(&args[1])?;
443    let to_array = as_int64_array(&args[2])?;
444
445    let array_data_type = args[0].data_type();
446    match array_data_type {
447        List(_) => {
448            let array = as_list_array(&args[0])?;
449            general_array_slice::<i32>(array, from_array, to_array, stride)
450        }
451        LargeList(_) => {
452            let array = as_large_list_array(&args[0])?;
453            general_array_slice::<i64>(array, from_array, to_array, stride)
454        }
455        _ => exec_err!("array_slice does not support type: {:?}", array_data_type),
456    }
457}
458
459fn general_array_slice<O: OffsetSizeTrait>(
460    array: &GenericListArray<O>,
461    from_array: &Int64Array,
462    to_array: &Int64Array,
463    stride: Option<&Int64Array>,
464) -> Result<ArrayRef>
465where
466    i64: TryInto<O>,
467{
468    let values = array.values();
469    let original_data = values.to_data();
470    let capacity = Capacities::Array(original_data.len());
471
472    let mut mutable =
473        MutableArrayData::with_capacities(vec![&original_data], true, capacity);
474
475    // We have the slice syntax compatible with DuckDB v0.8.1.
476    // The rule `adjusted_from_index` and `adjusted_to_index` follows the rule of array_slice in duckdb.
477
478    fn adjusted_from_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
479    where
480        i64: TryInto<O>,
481    {
482        // 0 ~ len - 1
483        let adjusted_zero_index = if index < 0 {
484            if let Ok(index) = index.try_into() {
485                // When index < 0 and -index > length, index is clamped to the beginning of the list.
486                // Otherwise, when index < 0, the index is counted from the end of the list.
487                //
488                // Note, we actually test the contrapositive, index < -length, because negating a
489                // negative will panic if the negative is equal to the smallest representable value
490                // while negating a positive is always safe.
491                if index < (O::zero() - O::one()) * len {
492                    O::zero()
493                } else {
494                    index + len
495                }
496            } else {
497                return exec_err!("array_slice got invalid index: {}", index);
498            }
499        } else {
500            // array_slice(arr, 1, to) is the same as array_slice(arr, 0, to)
501            if let Ok(index) = index.try_into() {
502                std::cmp::max(index - O::usize_as(1), O::usize_as(0))
503            } else {
504                return exec_err!("array_slice got invalid index: {}", index);
505            }
506        };
507
508        if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
509            Ok(Some(adjusted_zero_index))
510        } else {
511            // Out of bounds
512            Ok(None)
513        }
514    }
515
516    fn adjusted_to_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
517    where
518        i64: TryInto<O>,
519    {
520        // 0 ~ len - 1
521        let adjusted_zero_index = if index < 0 {
522            // array_slice in duckdb with negative to_index is python-like, so index itself is exclusive
523            if let Ok(index) = index.try_into() {
524                index + len
525            } else {
526                return exec_err!("array_slice got invalid index: {}", index);
527            }
528        } else {
529            // array_slice(arr, from, len + 1) is the same as array_slice(arr, from, len)
530            if let Ok(index) = index.try_into() {
531                std::cmp::min(index - O::usize_as(1), len - O::usize_as(1))
532            } else {
533                return exec_err!("array_slice got invalid index: {}", index);
534            }
535        };
536
537        if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
538            Ok(Some(adjusted_zero_index))
539        } else {
540            // Out of bounds
541            Ok(None)
542        }
543    }
544
545    let mut offsets = vec![O::usize_as(0)];
546    let mut null_builder = NullBufferBuilder::new(array.len());
547
548    for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
549        let start = offset_window[0];
550        let end = offset_window[1];
551        let len = end - start;
552
553        // If any input is null, return null.
554        if array.is_null(row_index)
555            || from_array.is_null(row_index)
556            || to_array.is_null(row_index)
557        {
558            mutable.extend_nulls(1);
559            offsets.push(offsets[row_index] + O::usize_as(1));
560            null_builder.append_null();
561            continue;
562        }
563        null_builder.append_non_null();
564
565        // Empty arrays always return an empty array.
566        if len == O::usize_as(0) {
567            offsets.push(offsets[row_index]);
568            continue;
569        }
570
571        let from_index = adjusted_from_index::<O>(from_array.value(row_index), len)?;
572        let to_index = adjusted_to_index::<O>(to_array.value(row_index), len)?;
573
574        if let (Some(from), Some(to)) = (from_index, to_index) {
575            let stride = stride.map(|s| s.value(row_index));
576            // Default stride is 1 if not provided
577            let stride = stride.unwrap_or(1);
578            if stride.is_zero() {
579                return exec_err!(
580                    "array_slice got invalid stride: {:?}, it cannot be 0",
581                    stride
582                );
583            } else if (from < to && stride.is_negative())
584                || (from > to && stride.is_positive())
585            {
586                // return empty array
587                offsets.push(offsets[row_index]);
588                continue;
589            }
590
591            let stride: O = stride.try_into().map_err(|_| {
592                internal_datafusion_err!("array_slice got invalid stride: {}", stride)
593            })?;
594
595            if from <= to && stride > O::zero() {
596                assert!(start + to <= end);
597                if stride.eq(&O::one()) {
598                    // stride is default to 1
599                    mutable.extend(
600                        0,
601                        (start + from).to_usize().unwrap(),
602                        (start + to + O::usize_as(1)).to_usize().unwrap(),
603                    );
604                    offsets.push(offsets[row_index] + (to - from + O::usize_as(1)));
605                    continue;
606                }
607                let mut index = start + from;
608                let mut cnt = 0;
609                while index <= start + to {
610                    mutable.extend(
611                        0,
612                        index.to_usize().unwrap(),
613                        index.to_usize().unwrap() + 1,
614                    );
615                    index += stride;
616                    cnt += 1;
617                }
618                offsets.push(offsets[row_index] + O::usize_as(cnt));
619            } else {
620                let mut index = start + from;
621                let mut cnt = 0;
622                while index >= start + to {
623                    mutable.extend(
624                        0,
625                        index.to_usize().unwrap(),
626                        index.to_usize().unwrap() + 1,
627                    );
628                    index += stride;
629                    cnt += 1;
630                }
631                // invalid range, return empty array
632                offsets.push(offsets[row_index] + O::usize_as(cnt));
633            }
634        } else {
635            // invalid range, return empty array
636            offsets.push(offsets[row_index]);
637        }
638    }
639
640    let data = mutable.freeze();
641
642    Ok(Arc::new(GenericListArray::<O>::try_new(
643        Arc::new(Field::new_list_field(array.value_type(), true)),
644        OffsetBuffer::<O>::new(offsets.into()),
645        arrow::array::make_array(data),
646        null_builder.finish(),
647    )?))
648}
649
650#[user_doc(
651    doc_section(label = "Array Functions"),
652    description = "Returns the array without the first element.",
653    syntax_example = "array_pop_front(array)",
654    sql_example = r#"```sql
655> select array_pop_front([1, 2, 3]);
656+-------------------------------+
657| array_pop_front(List([1,2,3])) |
658+-------------------------------+
659| [2, 3]                        |
660+-------------------------------+
661```"#,
662    argument(
663        name = "array",
664        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
665    )
666)]
667#[derive(Debug)]
668pub(super) struct ArrayPopFront {
669    signature: Signature,
670    aliases: Vec<String>,
671}
672
673impl ArrayPopFront {
674    pub fn new() -> Self {
675        Self {
676            signature: Signature {
677                type_signature: TypeSignature::ArraySignature(
678                    ArrayFunctionSignature::Array {
679                        arguments: vec![ArrayFunctionArgument::Array],
680                        array_coercion: Some(ListCoercion::FixedSizedListToList),
681                    },
682                ),
683                volatility: Volatility::Immutable,
684            },
685            aliases: vec![String::from("list_pop_front")],
686        }
687    }
688}
689
690impl ScalarUDFImpl for ArrayPopFront {
691    fn as_any(&self) -> &dyn Any {
692        self
693    }
694    fn name(&self) -> &str {
695        "array_pop_front"
696    }
697
698    fn signature(&self) -> &Signature {
699        &self.signature
700    }
701
702    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
703        Ok(arg_types[0].clone())
704    }
705
706    fn invoke_with_args(
707        &self,
708        args: datafusion_expr::ScalarFunctionArgs,
709    ) -> Result<ColumnarValue> {
710        make_scalar_function(array_pop_front_inner)(&args.args)
711    }
712
713    fn aliases(&self) -> &[String] {
714        &self.aliases
715    }
716
717    fn documentation(&self) -> Option<&Documentation> {
718        self.doc()
719    }
720}
721
722/// array_pop_front SQL function
723fn array_pop_front_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
724    let array_data_type = args[0].data_type();
725    match array_data_type {
726        List(_) => {
727            let array = as_list_array(&args[0])?;
728            general_pop_front_list::<i32>(array)
729        }
730        LargeList(_) => {
731            let array = as_large_list_array(&args[0])?;
732            general_pop_front_list::<i64>(array)
733        }
734        _ => exec_err!(
735            "array_pop_front does not support type: {:?}",
736            array_data_type
737        ),
738    }
739}
740
741fn general_pop_front_list<O: OffsetSizeTrait>(
742    array: &GenericListArray<O>,
743) -> Result<ArrayRef>
744where
745    i64: TryInto<O>,
746{
747    let from_array = Int64Array::from(vec![2; array.len()]);
748    let to_array = Int64Array::from(
749        array
750            .iter()
751            .map(|arr| arr.map_or(0, |arr| arr.len() as i64))
752            .collect::<Vec<i64>>(),
753    );
754    general_array_slice::<O>(array, &from_array, &to_array, None)
755}
756
757#[user_doc(
758    doc_section(label = "Array Functions"),
759    description = "Returns the array without the last element.",
760    syntax_example = "array_pop_back(array)",
761    sql_example = r#"```sql
762> select array_pop_back([1, 2, 3]);
763+-------------------------------+
764| array_pop_back(List([1,2,3])) |
765+-------------------------------+
766| [1, 2]                        |
767+-------------------------------+
768```"#,
769    argument(
770        name = "array",
771        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
772    )
773)]
774#[derive(Debug)]
775pub(super) struct ArrayPopBack {
776    signature: Signature,
777    aliases: Vec<String>,
778}
779
780impl ArrayPopBack {
781    pub fn new() -> Self {
782        Self {
783            signature: Signature {
784                type_signature: TypeSignature::ArraySignature(
785                    ArrayFunctionSignature::Array {
786                        arguments: vec![ArrayFunctionArgument::Array],
787                        array_coercion: Some(ListCoercion::FixedSizedListToList),
788                    },
789                ),
790                volatility: Volatility::Immutable,
791            },
792            aliases: vec![String::from("list_pop_back")],
793        }
794    }
795}
796
797impl ScalarUDFImpl for ArrayPopBack {
798    fn as_any(&self) -> &dyn Any {
799        self
800    }
801    fn name(&self) -> &str {
802        "array_pop_back"
803    }
804
805    fn signature(&self) -> &Signature {
806        &self.signature
807    }
808
809    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
810        Ok(arg_types[0].clone())
811    }
812
813    fn invoke_with_args(
814        &self,
815        args: datafusion_expr::ScalarFunctionArgs,
816    ) -> Result<ColumnarValue> {
817        make_scalar_function(array_pop_back_inner)(&args.args)
818    }
819
820    fn aliases(&self) -> &[String] {
821        &self.aliases
822    }
823
824    fn documentation(&self) -> Option<&Documentation> {
825        self.doc()
826    }
827}
828
829/// array_pop_back SQL function
830fn array_pop_back_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
831    let [array] = take_function_args("array_pop_back", args)?;
832
833    match array.data_type() {
834        List(_) => {
835            let array = as_list_array(&array)?;
836            general_pop_back_list::<i32>(array)
837        }
838        LargeList(_) => {
839            let array = as_large_list_array(&array)?;
840            general_pop_back_list::<i64>(array)
841        }
842        _ => exec_err!(
843            "array_pop_back does not support type: {:?}",
844            array.data_type()
845        ),
846    }
847}
848
849fn general_pop_back_list<O: OffsetSizeTrait>(
850    array: &GenericListArray<O>,
851) -> Result<ArrayRef>
852where
853    i64: TryInto<O>,
854{
855    let from_array = Int64Array::from(vec![1; array.len()]);
856    let to_array = Int64Array::from(
857        array
858            .iter()
859            .map(|arr| arr.map_or(0, |arr| arr.len() as i64 - 1))
860            .collect::<Vec<i64>>(),
861    );
862    general_array_slice::<O>(array, &from_array, &to_array, None)
863}
864
865#[user_doc(
866    doc_section(label = "Array Functions"),
867    description = "Returns the first non-null element in the array.",
868    syntax_example = "array_any_value(array)",
869    sql_example = r#"```sql
870> select array_any_value([NULL, 1, 2, 3]);
871+-------------------------------+
872| array_any_value(List([NULL,1,2,3])) |
873+-------------------------------------+
874| 1                                   |
875+-------------------------------------+
876```"#,
877    argument(
878        name = "array",
879        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
880    )
881)]
882#[derive(Debug)]
883pub(super) struct ArrayAnyValue {
884    signature: Signature,
885    aliases: Vec<String>,
886}
887
888impl ArrayAnyValue {
889    pub fn new() -> Self {
890        Self {
891            signature: Signature::array(Volatility::Immutable),
892            aliases: vec![String::from("list_any_value")],
893        }
894    }
895}
896
897impl ScalarUDFImpl for ArrayAnyValue {
898    fn as_any(&self) -> &dyn Any {
899        self
900    }
901    fn name(&self) -> &str {
902        "array_any_value"
903    }
904    fn signature(&self) -> &Signature {
905        &self.signature
906    }
907    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
908        match &arg_types[0] {
909            List(field)
910            | LargeList(field)
911            | FixedSizeList(field, _) => Ok(field.data_type().clone()),
912            _ => plan_err!(
913                "array_any_value can only accept List, LargeList or FixedSizeList as the argument"
914            ),
915        }
916    }
917
918    fn invoke_with_args(
919        &self,
920        args: datafusion_expr::ScalarFunctionArgs,
921    ) -> Result<ColumnarValue> {
922        make_scalar_function(array_any_value_inner)(&args.args)
923    }
924
925    fn aliases(&self) -> &[String] {
926        &self.aliases
927    }
928
929    fn documentation(&self) -> Option<&Documentation> {
930        self.doc()
931    }
932}
933
934fn array_any_value_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
935    let [array] = take_function_args("array_any_value", args)?;
936
937    match &array.data_type() {
938        List(_) => {
939            let array = as_list_array(&array)?;
940            general_array_any_value::<i32>(array)
941        }
942        LargeList(_) => {
943            let array = as_large_list_array(&array)?;
944            general_array_any_value::<i64>(array)
945        }
946        data_type => exec_err!("array_any_value does not support type: {:?}", data_type),
947    }
948}
949
950fn general_array_any_value<O: OffsetSizeTrait>(
951    array: &GenericListArray<O>,
952) -> Result<ArrayRef>
953where
954    i64: TryInto<O>,
955{
956    let values = array.values();
957    let original_data = values.to_data();
958    let capacity = Capacities::Array(array.len());
959
960    let mut mutable =
961        MutableArrayData::with_capacities(vec![&original_data], true, capacity);
962
963    for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
964        let start = offset_window[0];
965        let end = offset_window[1];
966        let len = end - start;
967
968        // array is null
969        if len == O::usize_as(0) {
970            mutable.extend_nulls(1);
971            continue;
972        }
973
974        let row_value = array.value(row_index);
975        match row_value.nulls() {
976            Some(row_nulls_buffer) => {
977                // nulls are present in the array so try to take the first valid element
978                if let Some(first_non_null_index) =
979                    row_nulls_buffer.valid_indices().next()
980                {
981                    let index = start.as_usize() + first_non_null_index;
982                    mutable.extend(0, index, index + 1)
983                } else {
984                    // all the elements in the array are null
985                    mutable.extend_nulls(1);
986                }
987            }
988            None => {
989                // no nulls are present in the array so take the first element
990                let index = start.as_usize();
991                mutable.extend(0, index, index + 1);
992            }
993        }
994    }
995
996    let data = mutable.freeze();
997    Ok(arrow::array::make_array(data))
998}
999
1000#[cfg(test)]
1001mod tests {
1002    use super::array_element_udf;
1003    use arrow::datatypes::{DataType, Field};
1004    use datafusion_common::{Column, DFSchema};
1005    use datafusion_expr::expr::ScalarFunction;
1006    use datafusion_expr::{Expr, ExprSchemable};
1007    use std::collections::HashMap;
1008
1009    // Regression test for https://github.com/apache/datafusion/issues/13755
1010    #[test]
1011    fn test_array_element_return_type_fixed_size_list() {
1012        let fixed_size_list_type = DataType::FixedSizeList(
1013            Field::new("some_arbitrary_test_field", DataType::Int32, false).into(),
1014            13,
1015        );
1016        let array_type = DataType::List(
1017            Field::new_list_field(fixed_size_list_type.clone(), true).into(),
1018        );
1019        let index_type = DataType::Int64;
1020
1021        let schema = DFSchema::from_unqualified_fields(
1022            vec![
1023                Field::new("my_array", array_type.clone(), false),
1024                Field::new("my_index", index_type.clone(), false),
1025            ]
1026            .into(),
1027            HashMap::default(),
1028        )
1029        .unwrap();
1030
1031        let udf = array_element_udf();
1032
1033        // ScalarUDFImpl::return_type
1034        assert_eq!(
1035            udf.return_type(&[array_type.clone(), index_type.clone()])
1036                .unwrap(),
1037            fixed_size_list_type
1038        );
1039
1040        // Via ExprSchemable::get_type (e.g. SimplifyInfo)
1041        let udf_expr = Expr::ScalarFunction(ScalarFunction {
1042            func: array_element_udf(),
1043            args: vec![
1044                Expr::Column(Column::new_unqualified("my_array")),
1045                Expr::Column(Column::new_unqualified("my_index")),
1046            ],
1047        });
1048        assert_eq!(
1049            ExprSchemable::get_type(&udf_expr, &schema).unwrap(),
1050            fixed_size_list_type
1051        );
1052    }
1053}