datafusion_functions_nested/
extract.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ScalarUDFImpl`] definitions for array_element, array_slice, array_pop_front, array_pop_back, and array_any_value functions.
19
20use arrow::array::{
21    Array, ArrayRef, ArrowNativeTypeOp, Capacities, GenericListArray, Int64Array,
22    MutableArrayData, NullArray, NullBufferBuilder, OffsetSizeTrait,
23};
24use arrow::buffer::OffsetBuffer;
25use arrow::datatypes::DataType;
26use arrow::datatypes::{
27    DataType::{FixedSizeList, LargeList, List, Null},
28    Field,
29};
30use datafusion_common::cast::as_int64_array;
31use datafusion_common::cast::as_large_list_array;
32use datafusion_common::cast::as_list_array;
33use datafusion_common::utils::ListCoercion;
34use datafusion_common::{
35    exec_err, internal_datafusion_err, plan_err, utils::take_function_args,
36    DataFusionError, Result,
37};
38use datafusion_expr::{
39    ArrayFunctionArgument, ArrayFunctionSignature, Expr, TypeSignature,
40};
41use datafusion_expr::{
42    ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
43};
44use datafusion_macros::user_doc;
45use std::any::Any;
46use std::sync::Arc;
47
48use crate::utils::make_scalar_function;
49
50// Create static instances of ScalarUDFs for each function
51make_udf_expr_and_func!(
52    ArrayElement,
53    array_element,
54    array element,
55    "extracts the element with the index n from the array.",
56    array_element_udf
57);
58
59create_func!(ArraySlice, array_slice_udf);
60
61make_udf_expr_and_func!(
62    ArrayPopFront,
63    array_pop_front,
64    array,
65    "returns the array without the first element.",
66    array_pop_front_udf
67);
68
69make_udf_expr_and_func!(
70    ArrayPopBack,
71    array_pop_back,
72    array,
73    "returns the array without the last element.",
74    array_pop_back_udf
75);
76
77make_udf_expr_and_func!(
78    ArrayAnyValue,
79    array_any_value,
80    array,
81    "returns the first non-null element in the array.",
82    array_any_value_udf
83);
84
85#[user_doc(
86    doc_section(label = "Array Functions"),
87    description = "Extracts the element with the index n from the array.",
88    syntax_example = "array_element(array, index)",
89    sql_example = r#"```sql
90> select array_element([1, 2, 3, 4], 3);
91+-----------------------------------------+
92| array_element(List([1,2,3,4]),Int64(3)) |
93+-----------------------------------------+
94| 3                                       |
95+-----------------------------------------+
96```"#,
97    argument(
98        name = "array",
99        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
100    ),
101    argument(
102        name = "index",
103        description = "Index to extract the element from the array."
104    )
105)]
106#[derive(Debug)]
107pub struct ArrayElement {
108    signature: Signature,
109    aliases: Vec<String>,
110}
111
112impl Default for ArrayElement {
113    fn default() -> Self {
114        Self::new()
115    }
116}
117
118impl ArrayElement {
119    pub fn new() -> Self {
120        Self {
121            signature: Signature::array_and_index(Volatility::Immutable),
122            aliases: vec![
123                String::from("array_extract"),
124                String::from("list_element"),
125                String::from("list_extract"),
126            ],
127        }
128    }
129}
130
131impl ScalarUDFImpl for ArrayElement {
132    fn as_any(&self) -> &dyn Any {
133        self
134    }
135    fn name(&self) -> &str {
136        "array_element"
137    }
138
139    fn display_name(&self, args: &[Expr]) -> Result<String> {
140        let args_name = args.iter().map(ToString::to_string).collect::<Vec<_>>();
141        if args_name.len() != 2 {
142            return exec_err!("expect 2 args, got {}", args_name.len());
143        }
144
145        Ok(format!("{}[{}]", args_name[0], args_name[1]))
146    }
147
148    fn schema_name(&self, args: &[Expr]) -> Result<String> {
149        let args_name = args
150            .iter()
151            .map(|e| e.schema_name().to_string())
152            .collect::<Vec<_>>();
153        if args_name.len() != 2 {
154            return exec_err!("expect 2 args, got {}", args_name.len());
155        }
156
157        Ok(format!("{}[{}]", args_name[0], args_name[1]))
158    }
159
160    fn signature(&self) -> &Signature {
161        &self.signature
162    }
163
164    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
165        match &arg_types[0] {
166            Null => Ok(Null),
167            List(field) | LargeList(field) => Ok(field.data_type().clone()),
168            arg_type => plan_err!("{} does not support type {arg_type}", self.name()),
169        }
170    }
171
172    fn invoke_with_args(
173        &self,
174        args: datafusion_expr::ScalarFunctionArgs,
175    ) -> Result<ColumnarValue> {
176        make_scalar_function(array_element_inner)(&args.args)
177    }
178
179    fn aliases(&self) -> &[String] {
180        &self.aliases
181    }
182
183    fn documentation(&self) -> Option<&Documentation> {
184        self.doc()
185    }
186}
187
188/// array_element SQL function
189///
190/// There are two arguments for array_element, the first one is the array, the second one is the 1-indexed index.
191/// `array_element(array, index)`
192///
193/// For example:
194/// > array_element(\[1, 2, 3], 2) -> 2
195fn array_element_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
196    let [array, indexes] = take_function_args("array_element", args)?;
197
198    match &array.data_type() {
199        Null => Ok(Arc::new(NullArray::new(array.len()))),
200        List(_) => {
201            let array = as_list_array(&array)?;
202            let indexes = as_int64_array(&indexes)?;
203            general_array_element::<i32>(array, indexes)
204        }
205        LargeList(_) => {
206            let array = as_large_list_array(&array)?;
207            let indexes = as_int64_array(&indexes)?;
208            general_array_element::<i64>(array, indexes)
209        }
210        arg_type => {
211            exec_err!("array_element does not support type {arg_type}")
212        }
213    }
214}
215
216fn general_array_element<O: OffsetSizeTrait>(
217    array: &GenericListArray<O>,
218    indexes: &Int64Array,
219) -> Result<ArrayRef>
220where
221    i64: TryInto<O>,
222{
223    let values = array.values();
224    if values.data_type().is_null() {
225        return Ok(Arc::new(NullArray::new(array.len())));
226    }
227
228    let original_data = values.to_data();
229    let capacity = Capacities::Array(original_data.len());
230
231    // use_nulls: true, we don't construct List for array_element, so we need explicit nulls.
232    let mut mutable =
233        MutableArrayData::with_capacities(vec![&original_data], true, capacity);
234
235    fn adjusted_array_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
236    where
237        i64: TryInto<O>,
238    {
239        let index: O = index.try_into().map_err(|_| {
240            DataFusionError::Execution(format!(
241                "array_element got invalid index: {index}"
242            ))
243        })?;
244        // 0 ~ len - 1
245        let adjusted_zero_index = if index < O::usize_as(0) {
246            index + len
247        } else {
248            index - O::usize_as(1)
249        };
250
251        if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
252            Ok(Some(adjusted_zero_index))
253        } else {
254            // Out of bounds
255            Ok(None)
256        }
257    }
258
259    for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
260        let start = offset_window[0];
261        let end = offset_window[1];
262        let len = end - start;
263
264        // array is null
265        if len == O::usize_as(0) {
266            mutable.extend_nulls(1);
267            continue;
268        }
269
270        let index = adjusted_array_index::<O>(indexes.value(row_index), len)?;
271
272        if let Some(index) = index {
273            let start = start.as_usize() + index.as_usize();
274            mutable.extend(0, start, start + 1_usize);
275        } else {
276            // Index out of bounds
277            mutable.extend_nulls(1);
278        }
279    }
280
281    let data = mutable.freeze();
282    Ok(arrow::array::make_array(data))
283}
284
285#[doc = "returns a slice of the array."]
286pub fn array_slice(array: Expr, begin: Expr, end: Expr, stride: Option<Expr>) -> Expr {
287    let args = match stride {
288        Some(stride) => vec![array, begin, end, stride],
289        None => vec![array, begin, end],
290    };
291    array_slice_udf().call(args)
292}
293
294#[user_doc(
295    doc_section(label = "Array Functions"),
296    description = "Returns a slice of the array based on 1-indexed start and end positions.",
297    syntax_example = "array_slice(array, begin, end)",
298    sql_example = r#"```sql
299> select array_slice([1, 2, 3, 4, 5, 6, 7, 8], 3, 6);
300+--------------------------------------------------------+
301| array_slice(List([1,2,3,4,5,6,7,8]),Int64(3),Int64(6)) |
302+--------------------------------------------------------+
303| [3, 4, 5, 6]                                           |
304+--------------------------------------------------------+
305```"#,
306    argument(
307        name = "array",
308        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
309    ),
310    argument(
311        name = "begin",
312        description = "Index of the first element. If negative, it counts backward from the end of the array."
313    ),
314    argument(
315        name = "end",
316        description = "Index of the last element. If negative, it counts backward from the end of the array."
317    ),
318    argument(
319        name = "stride",
320        description = "Stride of the array slice. The default is 1."
321    )
322)]
323#[derive(Debug)]
324pub(super) struct ArraySlice {
325    signature: Signature,
326    aliases: Vec<String>,
327}
328
329impl ArraySlice {
330    pub fn new() -> Self {
331        Self {
332            signature: Signature::one_of(
333                vec![
334                    TypeSignature::ArraySignature(ArrayFunctionSignature::Array {
335                        arguments: vec![
336                            ArrayFunctionArgument::Array,
337                            ArrayFunctionArgument::Index,
338                            ArrayFunctionArgument::Index,
339                        ],
340                        array_coercion: None,
341                    }),
342                    TypeSignature::ArraySignature(ArrayFunctionSignature::Array {
343                        arguments: vec![
344                            ArrayFunctionArgument::Array,
345                            ArrayFunctionArgument::Index,
346                            ArrayFunctionArgument::Index,
347                            ArrayFunctionArgument::Index,
348                        ],
349                        array_coercion: None,
350                    }),
351                ],
352                Volatility::Immutable,
353            ),
354            aliases: vec![String::from("list_slice")],
355        }
356    }
357}
358
359impl ScalarUDFImpl for ArraySlice {
360    fn as_any(&self) -> &dyn Any {
361        self
362    }
363
364    fn display_name(&self, args: &[Expr]) -> Result<String> {
365        let args_name = args.iter().map(ToString::to_string).collect::<Vec<_>>();
366        if let Some((arr, indexes)) = args_name.split_first() {
367            Ok(format!("{arr}[{}]", indexes.join(":")))
368        } else {
369            exec_err!("no argument")
370        }
371    }
372
373    fn schema_name(&self, args: &[Expr]) -> Result<String> {
374        let args_name = args
375            .iter()
376            .map(|e| e.schema_name().to_string())
377            .collect::<Vec<_>>();
378        if let Some((arr, indexes)) = args_name.split_first() {
379            Ok(format!("{arr}[{}]", indexes.join(":")))
380        } else {
381            exec_err!("no argument")
382        }
383    }
384
385    fn name(&self) -> &str {
386        "array_slice"
387    }
388
389    fn signature(&self) -> &Signature {
390        &self.signature
391    }
392
393    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
394        Ok(arg_types[0].clone())
395    }
396
397    fn invoke_with_args(
398        &self,
399        args: datafusion_expr::ScalarFunctionArgs,
400    ) -> Result<ColumnarValue> {
401        make_scalar_function(array_slice_inner)(&args.args)
402    }
403
404    fn aliases(&self) -> &[String] {
405        &self.aliases
406    }
407
408    fn documentation(&self) -> Option<&Documentation> {
409        self.doc()
410    }
411}
412
413/// array_slice SQL function
414///
415/// We follow the behavior of array_slice in DuckDB
416/// Note that array_slice is 1-indexed. And there are two additional arguments `from` and `to` in array_slice.
417///
418/// > array_slice(array, from, to)
419///
420/// Positive index is treated as the index from the start of the array. If the
421/// `from` index is smaller than 1, it is treated as 1. If the `to` index is larger than the
422/// length of the array, it is treated as the length of the array.
423///
424/// Negative index is treated as the index from the end of the array. If the index
425/// is larger than the length of the array, it is NOT VALID, either in `from` or `to`.
426/// The `to` index is exclusive like python slice syntax.
427///
428/// See test cases in `array.slt` for more details.
429fn array_slice_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
430    let args_len = args.len();
431    if args_len != 3 && args_len != 4 {
432        return exec_err!("array_slice needs three or four arguments");
433    }
434
435    let stride = if args_len == 4 {
436        Some(as_int64_array(&args[3])?)
437    } else {
438        None
439    };
440
441    let from_array = as_int64_array(&args[1])?;
442    let to_array = as_int64_array(&args[2])?;
443
444    let array_data_type = args[0].data_type();
445    match array_data_type {
446        List(_) => {
447            let array = as_list_array(&args[0])?;
448            general_array_slice::<i32>(array, from_array, to_array, stride)
449        }
450        LargeList(_) => {
451            let array = as_large_list_array(&args[0])?;
452            general_array_slice::<i64>(array, from_array, to_array, stride)
453        }
454        _ => exec_err!("array_slice does not support type: {:?}", array_data_type),
455    }
456}
457
458fn general_array_slice<O: OffsetSizeTrait>(
459    array: &GenericListArray<O>,
460    from_array: &Int64Array,
461    to_array: &Int64Array,
462    stride: Option<&Int64Array>,
463) -> Result<ArrayRef>
464where
465    i64: TryInto<O>,
466{
467    let values = array.values();
468    let original_data = values.to_data();
469    let capacity = Capacities::Array(original_data.len());
470
471    let mut mutable =
472        MutableArrayData::with_capacities(vec![&original_data], true, capacity);
473
474    // We have the slice syntax compatible with DuckDB v0.8.1.
475    // The rule `adjusted_from_index` and `adjusted_to_index` follows the rule of array_slice in duckdb.
476
477    fn adjusted_from_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
478    where
479        i64: TryInto<O>,
480    {
481        // 0 ~ len - 1
482        let adjusted_zero_index = if index < 0 {
483            if let Ok(index) = index.try_into() {
484                // When index < 0 and -index > length, index is clamped to the beginning of the list.
485                // Otherwise, when index < 0, the index is counted from the end of the list.
486                //
487                // Note, we actually test the contrapositive, index < -length, because negating a
488                // negative will panic if the negative is equal to the smallest representable value
489                // while negating a positive is always safe.
490                if index < (O::zero() - O::one()) * len {
491                    O::zero()
492                } else {
493                    index + len
494                }
495            } else {
496                return exec_err!("array_slice got invalid index: {}", index);
497            }
498        } else {
499            // array_slice(arr, 1, to) is the same as array_slice(arr, 0, to)
500            if let Ok(index) = index.try_into() {
501                std::cmp::max(index - O::usize_as(1), O::usize_as(0))
502            } else {
503                return exec_err!("array_slice got invalid index: {}", index);
504            }
505        };
506
507        if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
508            Ok(Some(adjusted_zero_index))
509        } else {
510            // Out of bounds
511            Ok(None)
512        }
513    }
514
515    fn adjusted_to_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
516    where
517        i64: TryInto<O>,
518    {
519        // 0 ~ len - 1
520        let adjusted_zero_index = if index < 0 {
521            // array_slice in duckdb with negative to_index is python-like, so index itself is exclusive
522            if let Ok(index) = index.try_into() {
523                index + len
524            } else {
525                return exec_err!("array_slice got invalid index: {}", index);
526            }
527        } else {
528            // array_slice(arr, from, len + 1) is the same as array_slice(arr, from, len)
529            if let Ok(index) = index.try_into() {
530                std::cmp::min(index - O::usize_as(1), len - O::usize_as(1))
531            } else {
532                return exec_err!("array_slice got invalid index: {}", index);
533            }
534        };
535
536        if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
537            Ok(Some(adjusted_zero_index))
538        } else {
539            // Out of bounds
540            Ok(None)
541        }
542    }
543
544    let mut offsets = vec![O::usize_as(0)];
545    let mut null_builder = NullBufferBuilder::new(array.len());
546
547    for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
548        let start = offset_window[0];
549        let end = offset_window[1];
550        let len = end - start;
551
552        // If any input is null, return null.
553        if array.is_null(row_index)
554            || from_array.is_null(row_index)
555            || to_array.is_null(row_index)
556        {
557            mutable.extend_nulls(1);
558            offsets.push(offsets[row_index] + O::usize_as(1));
559            null_builder.append_null();
560            continue;
561        }
562        null_builder.append_non_null();
563
564        // Empty arrays always return an empty array.
565        if len == O::usize_as(0) {
566            offsets.push(offsets[row_index]);
567            continue;
568        }
569
570        let from_index = adjusted_from_index::<O>(from_array.value(row_index), len)?;
571        let to_index = adjusted_to_index::<O>(to_array.value(row_index), len)?;
572
573        if let (Some(from), Some(to)) = (from_index, to_index) {
574            let stride = stride.map(|s| s.value(row_index));
575            // Default stride is 1 if not provided
576            let stride = stride.unwrap_or(1);
577            if stride.is_zero() {
578                return exec_err!(
579                    "array_slice got invalid stride: {:?}, it cannot be 0",
580                    stride
581                );
582            } else if (from < to && stride.is_negative())
583                || (from > to && stride.is_positive())
584            {
585                // return empty array
586                offsets.push(offsets[row_index]);
587                continue;
588            }
589
590            let stride: O = stride.try_into().map_err(|_| {
591                internal_datafusion_err!("array_slice got invalid stride: {}", stride)
592            })?;
593
594            if from <= to && stride > O::zero() {
595                assert!(start + to <= end);
596                if stride.eq(&O::one()) {
597                    // stride is default to 1
598                    mutable.extend(
599                        0,
600                        (start + from).to_usize().unwrap(),
601                        (start + to + O::usize_as(1)).to_usize().unwrap(),
602                    );
603                    offsets.push(offsets[row_index] + (to - from + O::usize_as(1)));
604                    continue;
605                }
606                let mut index = start + from;
607                let mut cnt = 0;
608                while index <= start + to {
609                    mutable.extend(
610                        0,
611                        index.to_usize().unwrap(),
612                        index.to_usize().unwrap() + 1,
613                    );
614                    index += stride;
615                    cnt += 1;
616                }
617                offsets.push(offsets[row_index] + O::usize_as(cnt));
618            } else {
619                let mut index = start + from;
620                let mut cnt = 0;
621                while index >= start + to {
622                    mutable.extend(
623                        0,
624                        index.to_usize().unwrap(),
625                        index.to_usize().unwrap() + 1,
626                    );
627                    index += stride;
628                    cnt += 1;
629                }
630                // invalid range, return empty array
631                offsets.push(offsets[row_index] + O::usize_as(cnt));
632            }
633        } else {
634            // invalid range, return empty array
635            offsets.push(offsets[row_index]);
636        }
637    }
638
639    let data = mutable.freeze();
640
641    Ok(Arc::new(GenericListArray::<O>::try_new(
642        Arc::new(Field::new_list_field(array.value_type(), true)),
643        OffsetBuffer::<O>::new(offsets.into()),
644        arrow::array::make_array(data),
645        null_builder.finish(),
646    )?))
647}
648
649#[user_doc(
650    doc_section(label = "Array Functions"),
651    description = "Returns the array without the first element.",
652    syntax_example = "array_pop_front(array)",
653    sql_example = r#"```sql
654> select array_pop_front([1, 2, 3]);
655+-------------------------------+
656| array_pop_front(List([1,2,3])) |
657+-------------------------------+
658| [2, 3]                        |
659+-------------------------------+
660```"#,
661    argument(
662        name = "array",
663        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
664    )
665)]
666#[derive(Debug)]
667pub(super) struct ArrayPopFront {
668    signature: Signature,
669    aliases: Vec<String>,
670}
671
672impl ArrayPopFront {
673    pub fn new() -> Self {
674        Self {
675            signature: Signature {
676                type_signature: TypeSignature::ArraySignature(
677                    ArrayFunctionSignature::Array {
678                        arguments: vec![ArrayFunctionArgument::Array],
679                        array_coercion: Some(ListCoercion::FixedSizedListToList),
680                    },
681                ),
682                volatility: Volatility::Immutable,
683            },
684            aliases: vec![String::from("list_pop_front")],
685        }
686    }
687}
688
689impl ScalarUDFImpl for ArrayPopFront {
690    fn as_any(&self) -> &dyn Any {
691        self
692    }
693    fn name(&self) -> &str {
694        "array_pop_front"
695    }
696
697    fn signature(&self) -> &Signature {
698        &self.signature
699    }
700
701    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
702        Ok(arg_types[0].clone())
703    }
704
705    fn invoke_with_args(
706        &self,
707        args: datafusion_expr::ScalarFunctionArgs,
708    ) -> Result<ColumnarValue> {
709        make_scalar_function(array_pop_front_inner)(&args.args)
710    }
711
712    fn aliases(&self) -> &[String] {
713        &self.aliases
714    }
715
716    fn documentation(&self) -> Option<&Documentation> {
717        self.doc()
718    }
719}
720
721/// array_pop_front SQL function
722fn array_pop_front_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
723    let array_data_type = args[0].data_type();
724    match array_data_type {
725        List(_) => {
726            let array = as_list_array(&args[0])?;
727            general_pop_front_list::<i32>(array)
728        }
729        LargeList(_) => {
730            let array = as_large_list_array(&args[0])?;
731            general_pop_front_list::<i64>(array)
732        }
733        _ => exec_err!(
734            "array_pop_front does not support type: {:?}",
735            array_data_type
736        ),
737    }
738}
739
740fn general_pop_front_list<O: OffsetSizeTrait>(
741    array: &GenericListArray<O>,
742) -> Result<ArrayRef>
743where
744    i64: TryInto<O>,
745{
746    let from_array = Int64Array::from(vec![2; array.len()]);
747    let to_array = Int64Array::from(
748        array
749            .iter()
750            .map(|arr| arr.map_or(0, |arr| arr.len() as i64))
751            .collect::<Vec<i64>>(),
752    );
753    general_array_slice::<O>(array, &from_array, &to_array, None)
754}
755
756#[user_doc(
757    doc_section(label = "Array Functions"),
758    description = "Returns the array without the last element.",
759    syntax_example = "array_pop_back(array)",
760    sql_example = r#"```sql
761> select array_pop_back([1, 2, 3]);
762+-------------------------------+
763| array_pop_back(List([1,2,3])) |
764+-------------------------------+
765| [1, 2]                        |
766+-------------------------------+
767```"#,
768    argument(
769        name = "array",
770        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
771    )
772)]
773#[derive(Debug)]
774pub(super) struct ArrayPopBack {
775    signature: Signature,
776    aliases: Vec<String>,
777}
778
779impl ArrayPopBack {
780    pub fn new() -> Self {
781        Self {
782            signature: Signature {
783                type_signature: TypeSignature::ArraySignature(
784                    ArrayFunctionSignature::Array {
785                        arguments: vec![ArrayFunctionArgument::Array],
786                        array_coercion: Some(ListCoercion::FixedSizedListToList),
787                    },
788                ),
789                volatility: Volatility::Immutable,
790            },
791            aliases: vec![String::from("list_pop_back")],
792        }
793    }
794}
795
796impl ScalarUDFImpl for ArrayPopBack {
797    fn as_any(&self) -> &dyn Any {
798        self
799    }
800    fn name(&self) -> &str {
801        "array_pop_back"
802    }
803
804    fn signature(&self) -> &Signature {
805        &self.signature
806    }
807
808    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
809        Ok(arg_types[0].clone())
810    }
811
812    fn invoke_with_args(
813        &self,
814        args: datafusion_expr::ScalarFunctionArgs,
815    ) -> Result<ColumnarValue> {
816        make_scalar_function(array_pop_back_inner)(&args.args)
817    }
818
819    fn aliases(&self) -> &[String] {
820        &self.aliases
821    }
822
823    fn documentation(&self) -> Option<&Documentation> {
824        self.doc()
825    }
826}
827
828/// array_pop_back SQL function
829fn array_pop_back_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
830    let [array] = take_function_args("array_pop_back", args)?;
831
832    match array.data_type() {
833        List(_) => {
834            let array = as_list_array(&array)?;
835            general_pop_back_list::<i32>(array)
836        }
837        LargeList(_) => {
838            let array = as_large_list_array(&array)?;
839            general_pop_back_list::<i64>(array)
840        }
841        _ => exec_err!(
842            "array_pop_back does not support type: {:?}",
843            array.data_type()
844        ),
845    }
846}
847
848fn general_pop_back_list<O: OffsetSizeTrait>(
849    array: &GenericListArray<O>,
850) -> Result<ArrayRef>
851where
852    i64: TryInto<O>,
853{
854    let from_array = Int64Array::from(vec![1; array.len()]);
855    let to_array = Int64Array::from(
856        array
857            .iter()
858            .map(|arr| arr.map_or(0, |arr| arr.len() as i64 - 1))
859            .collect::<Vec<i64>>(),
860    );
861    general_array_slice::<O>(array, &from_array, &to_array, None)
862}
863
864#[user_doc(
865    doc_section(label = "Array Functions"),
866    description = "Returns the first non-null element in the array.",
867    syntax_example = "array_any_value(array)",
868    sql_example = r#"```sql
869> select array_any_value([NULL, 1, 2, 3]);
870+-------------------------------+
871| array_any_value(List([NULL,1,2,3])) |
872+-------------------------------------+
873| 1                                   |
874+-------------------------------------+
875```"#,
876    argument(
877        name = "array",
878        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
879    )
880)]
881#[derive(Debug)]
882pub(super) struct ArrayAnyValue {
883    signature: Signature,
884    aliases: Vec<String>,
885}
886
887impl ArrayAnyValue {
888    pub fn new() -> Self {
889        Self {
890            signature: Signature::array(Volatility::Immutable),
891            aliases: vec![String::from("list_any_value")],
892        }
893    }
894}
895
896impl ScalarUDFImpl for ArrayAnyValue {
897    fn as_any(&self) -> &dyn Any {
898        self
899    }
900    fn name(&self) -> &str {
901        "array_any_value"
902    }
903    fn signature(&self) -> &Signature {
904        &self.signature
905    }
906    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
907        match &arg_types[0] {
908            List(field)
909            | LargeList(field)
910            | FixedSizeList(field, _) => Ok(field.data_type().clone()),
911            _ => plan_err!(
912                "array_any_value can only accept List, LargeList or FixedSizeList as the argument"
913            ),
914        }
915    }
916
917    fn invoke_with_args(
918        &self,
919        args: datafusion_expr::ScalarFunctionArgs,
920    ) -> Result<ColumnarValue> {
921        make_scalar_function(array_any_value_inner)(&args.args)
922    }
923
924    fn aliases(&self) -> &[String] {
925        &self.aliases
926    }
927
928    fn documentation(&self) -> Option<&Documentation> {
929        self.doc()
930    }
931}
932
933fn array_any_value_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
934    let [array] = take_function_args("array_any_value", args)?;
935
936    match &array.data_type() {
937        List(_) => {
938            let array = as_list_array(&array)?;
939            general_array_any_value::<i32>(array)
940        }
941        LargeList(_) => {
942            let array = as_large_list_array(&array)?;
943            general_array_any_value::<i64>(array)
944        }
945        data_type => exec_err!("array_any_value does not support type: {:?}", data_type),
946    }
947}
948
949fn general_array_any_value<O: OffsetSizeTrait>(
950    array: &GenericListArray<O>,
951) -> Result<ArrayRef>
952where
953    i64: TryInto<O>,
954{
955    let values = array.values();
956    let original_data = values.to_data();
957    let capacity = Capacities::Array(array.len());
958
959    let mut mutable =
960        MutableArrayData::with_capacities(vec![&original_data], true, capacity);
961
962    for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
963        let start = offset_window[0];
964        let end = offset_window[1];
965        let len = end - start;
966
967        // array is null
968        if len == O::usize_as(0) {
969            mutable.extend_nulls(1);
970            continue;
971        }
972
973        let row_value = array.value(row_index);
974        match row_value.nulls() {
975            Some(row_nulls_buffer) => {
976                // nulls are present in the array so try to take the first valid element
977                if let Some(first_non_null_index) =
978                    row_nulls_buffer.valid_indices().next()
979                {
980                    let index = start.as_usize() + first_non_null_index;
981                    mutable.extend(0, index, index + 1)
982                } else {
983                    // all the elements in the array are null
984                    mutable.extend_nulls(1);
985                }
986            }
987            None => {
988                // no nulls are present in the array so take the first element
989                let index = start.as_usize();
990                mutable.extend(0, index, index + 1);
991            }
992        }
993    }
994
995    let data = mutable.freeze();
996    Ok(arrow::array::make_array(data))
997}
998
999#[cfg(test)]
1000mod tests {
1001    use super::array_element_udf;
1002    use arrow::datatypes::{DataType, Field};
1003    use datafusion_common::{Column, DFSchema};
1004    use datafusion_expr::expr::ScalarFunction;
1005    use datafusion_expr::{Expr, ExprSchemable};
1006    use std::collections::HashMap;
1007
1008    // Regression test for https://github.com/apache/datafusion/issues/13755
1009    #[test]
1010    fn test_array_element_return_type_fixed_size_list() {
1011        let fixed_size_list_type = DataType::FixedSizeList(
1012            Field::new("some_arbitrary_test_field", DataType::Int32, false).into(),
1013            13,
1014        );
1015        let array_type = DataType::List(
1016            Field::new_list_field(fixed_size_list_type.clone(), true).into(),
1017        );
1018        let index_type = DataType::Int64;
1019
1020        let schema = DFSchema::from_unqualified_fields(
1021            vec![
1022                Field::new("my_array", array_type.clone(), false),
1023                Field::new("my_index", index_type.clone(), false),
1024            ]
1025            .into(),
1026            HashMap::default(),
1027        )
1028        .unwrap();
1029
1030        let udf = array_element_udf();
1031
1032        // ScalarUDFImpl::return_type
1033        assert_eq!(
1034            udf.return_type(&[array_type.clone(), index_type.clone()])
1035                .unwrap(),
1036            fixed_size_list_type
1037        );
1038
1039        // Via ExprSchemable::get_type (e.g. SimplifyInfo)
1040        let udf_expr = Expr::ScalarFunction(ScalarFunction {
1041            func: array_element_udf(),
1042            args: vec![
1043                Expr::Column(Column::new_unqualified("my_array")),
1044                Expr::Column(Column::new_unqualified("my_index")),
1045            ],
1046        });
1047        assert_eq!(
1048            ExprSchemable::get_type(&udf_expr, &schema).unwrap(),
1049            fixed_size_list_type
1050        );
1051    }
1052}