datafusion_functions_nested/
extract.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ScalarUDFImpl`] definitions for array_element, array_slice, array_pop_front, array_pop_back, and array_any_value functions.
19
20use arrow::array::{
21    Array, ArrayRef, Capacities, GenericListArray, GenericListViewArray, Int64Array,
22    MutableArrayData, NullArray, NullBufferBuilder, OffsetSizeTrait,
23};
24use arrow::buffer::{OffsetBuffer, ScalarBuffer};
25use arrow::datatypes::DataType;
26use arrow::datatypes::{
27    DataType::{FixedSizeList, LargeList, LargeListView, List, ListView, Null},
28    Field,
29};
30use datafusion_common::cast::as_large_list_array;
31use datafusion_common::cast::as_list_array;
32use datafusion_common::cast::{
33    as_int64_array, as_large_list_view_array, as_list_view_array,
34};
35use datafusion_common::internal_err;
36use datafusion_common::utils::ListCoercion;
37use datafusion_common::{
38    Result, exec_datafusion_err, exec_err, internal_datafusion_err, plan_err,
39    utils::take_function_args,
40};
41use datafusion_expr::{
42    ArrayFunctionArgument, ArrayFunctionSignature, Expr, TypeSignature,
43};
44use datafusion_expr::{
45    ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
46};
47use datafusion_macros::user_doc;
48use std::any::Any;
49use std::sync::Arc;
50
51use crate::utils::make_scalar_function;
52
53// Create static instances of ScalarUDFs for each function
54make_udf_expr_and_func!(
55    ArrayElement,
56    array_element,
57    array element,
58    "extracts the element with the index n from the array.",
59    array_element_udf
60);
61
62create_func!(ArraySlice, array_slice_udf);
63
64make_udf_expr_and_func!(
65    ArrayPopFront,
66    array_pop_front,
67    array,
68    "returns the array without the first element.",
69    array_pop_front_udf
70);
71
72make_udf_expr_and_func!(
73    ArrayPopBack,
74    array_pop_back,
75    array,
76    "returns the array without the last element.",
77    array_pop_back_udf
78);
79
80make_udf_expr_and_func!(
81    ArrayAnyValue,
82    array_any_value,
83    array,
84    "returns the first non-null element in the array.",
85    array_any_value_udf
86);
87
88#[user_doc(
89    doc_section(label = "Array Functions"),
90    description = "Extracts the element with the index n from the array.",
91    syntax_example = "array_element(array, index)",
92    sql_example = r#"```sql
93> select array_element([1, 2, 3, 4], 3);
94+-----------------------------------------+
95| array_element(List([1,2,3,4]),Int64(3)) |
96+-----------------------------------------+
97| 3                                       |
98+-----------------------------------------+
99```"#,
100    argument(
101        name = "array",
102        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
103    ),
104    argument(
105        name = "index",
106        description = "Index to extract the element from the array."
107    )
108)]
109#[derive(Debug, PartialEq, Eq, Hash)]
110pub struct ArrayElement {
111    signature: Signature,
112    aliases: Vec<String>,
113}
114
115impl Default for ArrayElement {
116    fn default() -> Self {
117        Self::new()
118    }
119}
120
121impl ArrayElement {
122    pub fn new() -> Self {
123        Self {
124            signature: Signature::array_and_index(Volatility::Immutable),
125            aliases: vec![
126                String::from("array_extract"),
127                String::from("list_element"),
128                String::from("list_extract"),
129            ],
130        }
131    }
132}
133
134impl ScalarUDFImpl for ArrayElement {
135    fn as_any(&self) -> &dyn Any {
136        self
137    }
138    fn name(&self) -> &str {
139        "array_element"
140    }
141
142    fn display_name(&self, args: &[Expr]) -> Result<String> {
143        let args_name = args.iter().map(ToString::to_string).collect::<Vec<_>>();
144        if args_name.len() != 2 {
145            return exec_err!("expect 2 args, got {}", args_name.len());
146        }
147
148        Ok(format!("{}[{}]", args_name[0], args_name[1]))
149    }
150
151    fn schema_name(&self, args: &[Expr]) -> Result<String> {
152        let args_name = args
153            .iter()
154            .map(|e| e.schema_name().to_string())
155            .collect::<Vec<_>>();
156        if args_name.len() != 2 {
157            return exec_err!("expect 2 args, got {}", args_name.len());
158        }
159
160        Ok(format!("{}[{}]", args_name[0], args_name[1]))
161    }
162
163    fn signature(&self) -> &Signature {
164        &self.signature
165    }
166
167    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
168        match &arg_types[0] {
169            Null => Ok(Null),
170            List(field) | LargeList(field) => Ok(field.data_type().clone()),
171            arg_type => plan_err!("{} does not support type {arg_type}", self.name()),
172        }
173    }
174
175    fn invoke_with_args(
176        &self,
177        args: datafusion_expr::ScalarFunctionArgs,
178    ) -> Result<ColumnarValue> {
179        make_scalar_function(array_element_inner)(&args.args)
180    }
181
182    fn aliases(&self) -> &[String] {
183        &self.aliases
184    }
185
186    fn documentation(&self) -> Option<&Documentation> {
187        self.doc()
188    }
189}
190
191/// array_element SQL function
192///
193/// There are two arguments for array_element, the first one is the array, the second one is the 1-indexed index.
194/// `array_element(array, index)`
195///
196/// For example:
197/// > array_element(\[1, 2, 3], 2) -> 2
198fn array_element_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
199    let [array, indexes] = take_function_args("array_element", args)?;
200
201    match &array.data_type() {
202        Null => Ok(Arc::new(NullArray::new(array.len()))),
203        List(_) => {
204            let array = as_list_array(&array)?;
205            let indexes = as_int64_array(&indexes)?;
206            general_array_element::<i32>(array, indexes)
207        }
208        LargeList(_) => {
209            let array = as_large_list_array(&array)?;
210            let indexes = as_int64_array(&indexes)?;
211            general_array_element::<i64>(array, indexes)
212        }
213        arg_type => {
214            exec_err!("array_element does not support type {arg_type}")
215        }
216    }
217}
218
219fn general_array_element<O: OffsetSizeTrait>(
220    array: &GenericListArray<O>,
221    indexes: &Int64Array,
222) -> Result<ArrayRef>
223where
224    i64: TryInto<O>,
225{
226    let values = array.values();
227    if values.data_type().is_null() {
228        return Ok(Arc::new(NullArray::new(array.len())));
229    }
230
231    let original_data = values.to_data();
232    let capacity = Capacities::Array(original_data.len());
233
234    // use_nulls: true, we don't construct List for array_element, so we need explicit nulls.
235    let mut mutable =
236        MutableArrayData::with_capacities(vec![&original_data], true, capacity);
237
238    fn adjusted_array_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
239    where
240        i64: TryInto<O>,
241    {
242        let index: O = index.try_into().map_err(|_| {
243            exec_datafusion_err!("array_element got invalid index: {index}")
244        })?;
245        // 0 ~ len - 1
246        let adjusted_zero_index = if index < O::usize_as(0) {
247            index + len
248        } else {
249            index - O::usize_as(1)
250        };
251
252        if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
253            Ok(Some(adjusted_zero_index))
254        } else {
255            // Out of bounds
256            Ok(None)
257        }
258    }
259
260    for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
261        let start = offset_window[0];
262        let end = offset_window[1];
263        let len = end - start;
264
265        // array is null
266        if len == O::usize_as(0) {
267            mutable.extend_nulls(1);
268            continue;
269        }
270
271        let index = adjusted_array_index::<O>(indexes.value(row_index), len)?;
272
273        if let Some(index) = index {
274            let start = start.as_usize() + index.as_usize();
275            mutable.extend(0, start, start + 1_usize);
276        } else {
277            // Index out of bounds
278            mutable.extend_nulls(1);
279        }
280    }
281
282    let data = mutable.freeze();
283    Ok(arrow::array::make_array(data))
284}
285
286#[doc = "returns a slice of the array."]
287pub fn array_slice(array: Expr, begin: Expr, end: Expr, stride: Option<Expr>) -> Expr {
288    let args = match stride {
289        Some(stride) => vec![array, begin, end, stride],
290        None => vec![array, begin, end],
291    };
292    array_slice_udf().call(args)
293}
294
295#[user_doc(
296    doc_section(label = "Array Functions"),
297    description = "Returns a slice of the array based on 1-indexed start and end positions.",
298    syntax_example = "array_slice(array, begin, end)",
299    sql_example = r#"```sql
300> select array_slice([1, 2, 3, 4, 5, 6, 7, 8], 3, 6);
301+--------------------------------------------------------+
302| array_slice(List([1,2,3,4,5,6,7,8]),Int64(3),Int64(6)) |
303+--------------------------------------------------------+
304| [3, 4, 5, 6]                                           |
305+--------------------------------------------------------+
306```"#,
307    argument(
308        name = "array",
309        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
310    ),
311    argument(
312        name = "begin",
313        description = "Index of the first element. If negative, it counts backward from the end of the array."
314    ),
315    argument(
316        name = "end",
317        description = "Index of the last element. If negative, it counts backward from the end of the array."
318    ),
319    argument(
320        name = "stride",
321        description = "Stride of the array slice. The default is 1."
322    )
323)]
324#[derive(Debug, PartialEq, Eq, Hash)]
325pub(super) struct ArraySlice {
326    signature: Signature,
327    aliases: Vec<String>,
328}
329
330impl ArraySlice {
331    pub fn new() -> Self {
332        Self {
333            signature: Signature::one_of(
334                vec![
335                    TypeSignature::ArraySignature(ArrayFunctionSignature::Array {
336                        arguments: vec![
337                            ArrayFunctionArgument::Array,
338                            ArrayFunctionArgument::Index,
339                            ArrayFunctionArgument::Index,
340                        ],
341                        array_coercion: Some(ListCoercion::FixedSizedListToList),
342                    }),
343                    TypeSignature::ArraySignature(ArrayFunctionSignature::Array {
344                        arguments: vec![
345                            ArrayFunctionArgument::Array,
346                            ArrayFunctionArgument::Index,
347                            ArrayFunctionArgument::Index,
348                            ArrayFunctionArgument::Index,
349                        ],
350                        array_coercion: Some(ListCoercion::FixedSizedListToList),
351                    }),
352                ],
353                Volatility::Immutable,
354            ),
355            aliases: vec![String::from("list_slice")],
356        }
357    }
358}
359
360impl ScalarUDFImpl for ArraySlice {
361    fn as_any(&self) -> &dyn Any {
362        self
363    }
364
365    fn display_name(&self, args: &[Expr]) -> Result<String> {
366        let args_name = args.iter().map(ToString::to_string).collect::<Vec<_>>();
367        if let Some((arr, indexes)) = args_name.split_first() {
368            Ok(format!("{arr}[{}]", indexes.join(":")))
369        } else {
370            exec_err!("no argument")
371        }
372    }
373
374    fn schema_name(&self, args: &[Expr]) -> Result<String> {
375        let args_name = args
376            .iter()
377            .map(|e| e.schema_name().to_string())
378            .collect::<Vec<_>>();
379        if let Some((arr, indexes)) = args_name.split_first() {
380            Ok(format!("{arr}[{}]", indexes.join(":")))
381        } else {
382            exec_err!("no argument")
383        }
384    }
385
386    fn name(&self) -> &str {
387        "array_slice"
388    }
389
390    fn signature(&self) -> &Signature {
391        &self.signature
392    }
393
394    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
395        Ok(arg_types[0].clone())
396    }
397
398    fn invoke_with_args(
399        &self,
400        args: datafusion_expr::ScalarFunctionArgs,
401    ) -> Result<ColumnarValue> {
402        make_scalar_function(array_slice_inner)(&args.args)
403    }
404
405    fn aliases(&self) -> &[String] {
406        &self.aliases
407    }
408
409    fn documentation(&self) -> Option<&Documentation> {
410        self.doc()
411    }
412}
413
414/// array_slice SQL function
415///
416/// We follow the behavior of array_slice in DuckDB
417/// Note that array_slice is 1-indexed. And there are two additional arguments `from` and `to` in array_slice.
418///
419/// > array_slice(array, from, to)
420///
421/// Positive index is treated as the index from the start of the array. If the
422/// `from` index is smaller than 1, it is treated as 1. If the `to` index is larger than the
423/// length of the array, it is treated as the length of the array.
424///
425/// Negative index is treated as the index from the end of the array. If the index
426/// is larger than the length of the array, it is NOT VALID, either in `from` or `to`.
427/// The `to` index is exclusive like python slice syntax.
428///
429/// See test cases in `array.slt` for more details.
430fn array_slice_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
431    let args_len = args.len();
432    if args_len != 3 && args_len != 4 {
433        return exec_err!("array_slice needs three or four arguments");
434    }
435
436    let stride = if args_len == 4 {
437        Some(as_int64_array(&args[3])?)
438    } else {
439        None
440    };
441
442    let from_array = as_int64_array(&args[1])?;
443    let to_array = as_int64_array(&args[2])?;
444
445    let array_data_type = args[0].data_type();
446    match array_data_type {
447        List(_) => {
448            let array = as_list_array(&args[0])?;
449            general_array_slice::<i32>(array, from_array, to_array, stride)
450        }
451        LargeList(_) => {
452            let array = as_large_list_array(&args[0])?;
453            general_array_slice::<i64>(array, from_array, to_array, stride)
454        }
455        ListView(_) => {
456            let array = as_list_view_array(&args[0])?;
457            general_list_view_array_slice::<i32>(array, from_array, to_array, stride)
458        }
459        LargeListView(_) => {
460            let array = as_large_list_view_array(&args[0])?;
461            general_list_view_array_slice::<i64>(array, from_array, to_array, stride)
462        }
463        _ => exec_err!("array_slice does not support type: {}", array_data_type),
464    }
465}
466
467fn adjusted_from_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
468where
469    i64: TryInto<O>,
470{
471    // 0 ~ len - 1
472    let adjusted_zero_index = if index < 0 {
473        if let Ok(index) = index.try_into() {
474            // When index < 0 and -index > length, index is clamped to the beginning of the list.
475            // Otherwise, when index < 0, the index is counted from the end of the list.
476            //
477            // Note, we actually test the contrapositive, index < -length, because negating a
478            // negative will panic if the negative is equal to the smallest representable value
479            // while negating a positive is always safe.
480            if index < (O::zero() - O::one()) * len {
481                O::zero()
482            } else {
483                index + len
484            }
485        } else {
486            return exec_err!("array_slice got invalid index: {}", index);
487        }
488    } else {
489        // array_slice(arr, 1, to) is the same as array_slice(arr, 0, to)
490        if let Ok(index) = index.try_into() {
491            std::cmp::max(index - O::usize_as(1), O::usize_as(0))
492        } else {
493            return exec_err!("array_slice got invalid index: {}", index);
494        }
495    };
496
497    if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
498        Ok(Some(adjusted_zero_index))
499    } else {
500        // Out of bounds
501        Ok(None)
502    }
503}
504
505fn adjusted_to_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
506where
507    i64: TryInto<O>,
508{
509    // 0 ~ len - 1
510    let adjusted_zero_index = if index < 0 {
511        // array_slice in duckdb with negative to_index is python-like, so index itself is exclusive
512        if let Ok(index) = index.try_into() {
513            index + len
514        } else {
515            return exec_err!("array_slice got invalid index: {}", index);
516        }
517    } else {
518        // array_slice(arr, from, len + 1) is the same as array_slice(arr, from, len)
519        if let Ok(index) = index.try_into() {
520            std::cmp::min(index - O::usize_as(1), len - O::usize_as(1))
521        } else {
522            return exec_err!("array_slice got invalid index: {}", index);
523        }
524    };
525
526    if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
527        Ok(Some(adjusted_zero_index))
528    } else {
529        // Out of bounds
530        Ok(None)
531    }
532}
533
534/// Internal plan describing how to materialize a single row's slice after
535/// the slice bounds/stride have been normalized. Both list layouts consume
536/// this to drive their copy logic.
537enum SlicePlan<O: OffsetSizeTrait> {
538    /// No values should be produced.
539    Empty,
540    /// A contiguous run starting at `start` (relative to the row) with `len`
541    /// elements can be copied in one go.
542    Contiguous { start: O, len: O },
543    /// Arbitrary positions (already relative to the row) must be copied in
544    /// sequence.
545    Indices(Vec<O>),
546}
547
548/// Produces a [`SlicePlan`] for the given logical slice parameters.
549fn compute_slice_plan<O: OffsetSizeTrait>(
550    len: O,
551    from_raw: i64,
552    to_raw: i64,
553    stride_raw: Option<i64>,
554) -> Result<SlicePlan<O>>
555where
556    i64: TryInto<O>,
557{
558    if len == O::usize_as(0) {
559        return Ok(SlicePlan::Empty);
560    }
561
562    let from_index = adjusted_from_index::<O>(from_raw, len)?;
563    let to_index = adjusted_to_index::<O>(to_raw, len)?;
564
565    let (Some(from), Some(to)) = (from_index, to_index) else {
566        return Ok(SlicePlan::Empty);
567    };
568
569    let stride_value = stride_raw.unwrap_or(1);
570    if stride_value == 0 {
571        return exec_err!(
572            "array_slice got invalid stride: {:?}, it cannot be 0",
573            stride_value
574        );
575    }
576
577    if (from < to && stride_value.is_negative())
578        || (from > to && stride_value.is_positive())
579    {
580        return Ok(SlicePlan::Empty);
581    }
582
583    let stride: O = stride_value.try_into().map_err(|_| {
584        internal_datafusion_err!("array_slice got invalid stride: {}", stride_value)
585    })?;
586
587    if from <= to && stride_value.is_positive() {
588        if stride_value == 1 {
589            let len = to - from + O::usize_as(1);
590            Ok(SlicePlan::Contiguous { start: from, len })
591        } else {
592            let mut indices = Vec::new();
593            let mut index = from;
594            while index <= to {
595                indices.push(index);
596                index += stride;
597            }
598            Ok(SlicePlan::Indices(indices))
599        }
600    } else {
601        let mut indices = Vec::new();
602        let mut index = from;
603        while index >= to {
604            indices.push(index);
605            index += stride;
606        }
607        Ok(SlicePlan::Indices(indices))
608    }
609}
610
611fn general_array_slice<O: OffsetSizeTrait>(
612    array: &GenericListArray<O>,
613    from_array: &Int64Array,
614    to_array: &Int64Array,
615    stride: Option<&Int64Array>,
616) -> Result<ArrayRef>
617where
618    i64: TryInto<O>,
619{
620    let values = array.values();
621    let original_data = values.to_data();
622    let capacity = Capacities::Array(original_data.len());
623
624    let mut mutable =
625        MutableArrayData::with_capacities(vec![&original_data], true, capacity);
626
627    // We have the slice syntax compatible with DuckDB v0.8.1.
628    // The rule `adjusted_from_index` and `adjusted_to_index` follows the rule of array_slice in duckdb.
629
630    let mut offsets = vec![O::usize_as(0)];
631    let mut null_builder = NullBufferBuilder::new(array.len());
632
633    for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
634        let start = offset_window[0];
635        let end = offset_window[1];
636        let len = end - start;
637
638        // If any input is null, return null.
639        if array.is_null(row_index)
640            || from_array.is_null(row_index)
641            || to_array.is_null(row_index)
642            || stride.is_some_and(|s| s.is_null(row_index))
643        {
644            mutable.extend_nulls(1);
645            offsets.push(offsets[row_index] + O::usize_as(1));
646            null_builder.append_null();
647            continue;
648        }
649        null_builder.append_non_null();
650
651        // Empty arrays always return an empty array.
652        if len == O::usize_as(0) {
653            offsets.push(offsets[row_index]);
654            continue;
655        }
656
657        let slice_plan = compute_slice_plan::<O>(
658            len,
659            from_array.value(row_index),
660            to_array.value(row_index),
661            stride.map(|s| s.value(row_index)),
662        )?;
663
664        match slice_plan {
665            SlicePlan::Empty => offsets.push(offsets[row_index]),
666            SlicePlan::Contiguous {
667                start: rel_start,
668                len: slice_len,
669            } => {
670                let start_index = (start + rel_start).to_usize().unwrap();
671                let end_index = (start + rel_start + slice_len).to_usize().unwrap();
672                mutable.extend(0, start_index, end_index);
673                offsets.push(offsets[row_index] + slice_len);
674            }
675            SlicePlan::Indices(indices) => {
676                let count = indices.len();
677                for rel_index in indices {
678                    let absolute_index = (start + rel_index).to_usize().unwrap();
679                    mutable.extend(0, absolute_index, absolute_index + 1);
680                }
681                offsets.push(offsets[row_index] + O::usize_as(count));
682            }
683        }
684    }
685
686    let data = mutable.freeze();
687
688    Ok(Arc::new(GenericListArray::<O>::try_new(
689        Arc::new(Field::new_list_field(array.value_type(), true)),
690        OffsetBuffer::<O>::new(offsets.into()),
691        arrow::array::make_array(data),
692        null_builder.finish(),
693    )?))
694}
695
696fn general_list_view_array_slice<O: OffsetSizeTrait>(
697    array: &GenericListViewArray<O>,
698    from_array: &Int64Array,
699    to_array: &Int64Array,
700    stride: Option<&Int64Array>,
701) -> Result<ArrayRef>
702where
703    i64: TryInto<O>,
704{
705    let values = array.values();
706    let original_data = values.to_data();
707    let capacity = Capacities::Array(original_data.len());
708    let field = match array.data_type() {
709        ListView(field) | LargeListView(field) => Arc::clone(field),
710        other => {
711            return internal_err!("array_slice got unexpected data type: {}", other);
712        }
713    };
714
715    let mut mutable =
716        MutableArrayData::with_capacities(vec![&original_data], true, capacity);
717
718    // We must build `offsets` and `sizes` buffers manually as ListView does not enforce
719    // monotonically increasing offsets.
720    let mut offsets = Vec::with_capacity(array.len());
721    let mut sizes = Vec::with_capacity(array.len());
722    let mut current_offset = O::usize_as(0);
723    let mut null_builder = NullBufferBuilder::new(array.len());
724
725    for row_index in 0..array.len() {
726        // Propagate NULL semantics: any NULL input yields a NULL output slot.
727        if array.is_null(row_index)
728            || from_array.is_null(row_index)
729            || to_array.is_null(row_index)
730            || stride.is_some_and(|s| s.is_null(row_index))
731        {
732            null_builder.append_null();
733            offsets.push(current_offset);
734            sizes.push(O::usize_as(0));
735            continue;
736        }
737        null_builder.append_non_null();
738
739        let len = array.value_size(row_index);
740
741        // Empty arrays always return an empty array.
742        if len == O::usize_as(0) {
743            offsets.push(current_offset);
744            sizes.push(O::usize_as(0));
745            continue;
746        }
747
748        let slice_plan = compute_slice_plan::<O>(
749            len,
750            from_array.value(row_index),
751            to_array.value(row_index),
752            stride.map(|s| s.value(row_index)),
753        )?;
754
755        let start = array.value_offset(row_index);
756        match slice_plan {
757            SlicePlan::Empty => {
758                offsets.push(current_offset);
759                sizes.push(O::usize_as(0));
760            }
761            SlicePlan::Contiguous {
762                start: rel_start,
763                len: slice_len,
764            } => {
765                let start_index = (start + rel_start).to_usize().unwrap();
766                let end_index = (start + rel_start + slice_len).to_usize().unwrap();
767                mutable.extend(0, start_index, end_index);
768                offsets.push(current_offset);
769                sizes.push(slice_len);
770                current_offset += slice_len;
771            }
772            SlicePlan::Indices(indices) => {
773                let count = indices.len();
774                for rel_index in indices {
775                    let absolute_index = (start + rel_index).to_usize().unwrap();
776                    mutable.extend(0, absolute_index, absolute_index + 1);
777                }
778                let length = O::usize_as(count);
779                offsets.push(current_offset);
780                sizes.push(length);
781                current_offset += length;
782            }
783        }
784    }
785
786    let data = mutable.freeze();
787
788    Ok(Arc::new(GenericListViewArray::<O>::try_new(
789        field,
790        ScalarBuffer::from(offsets),
791        ScalarBuffer::from(sizes),
792        arrow::array::make_array(data),
793        null_builder.finish(),
794    )?))
795}
796
797#[user_doc(
798    doc_section(label = "Array Functions"),
799    description = "Returns the array without the first element.",
800    syntax_example = "array_pop_front(array)",
801    sql_example = r#"```sql
802> select array_pop_front([1, 2, 3]);
803+-------------------------------+
804| array_pop_front(List([1,2,3])) |
805+-------------------------------+
806| [2, 3]                        |
807+-------------------------------+
808```"#,
809    argument(
810        name = "array",
811        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
812    )
813)]
814#[derive(Debug, PartialEq, Eq, Hash)]
815pub(super) struct ArrayPopFront {
816    signature: Signature,
817    aliases: Vec<String>,
818}
819
820impl ArrayPopFront {
821    pub fn new() -> Self {
822        Self {
823            signature: Signature::array(Volatility::Immutable),
824            aliases: vec![String::from("list_pop_front")],
825        }
826    }
827}
828
829impl ScalarUDFImpl for ArrayPopFront {
830    fn as_any(&self) -> &dyn Any {
831        self
832    }
833    fn name(&self) -> &str {
834        "array_pop_front"
835    }
836
837    fn signature(&self) -> &Signature {
838        &self.signature
839    }
840
841    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
842        Ok(arg_types[0].clone())
843    }
844
845    fn invoke_with_args(
846        &self,
847        args: datafusion_expr::ScalarFunctionArgs,
848    ) -> Result<ColumnarValue> {
849        make_scalar_function(array_pop_front_inner)(&args.args)
850    }
851
852    fn aliases(&self) -> &[String] {
853        &self.aliases
854    }
855
856    fn documentation(&self) -> Option<&Documentation> {
857        self.doc()
858    }
859}
860
861/// array_pop_front SQL function
862fn array_pop_front_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
863    let array_data_type = args[0].data_type();
864    match array_data_type {
865        List(_) => {
866            let array = as_list_array(&args[0])?;
867            general_pop_front_list::<i32>(array)
868        }
869        LargeList(_) => {
870            let array = as_large_list_array(&args[0])?;
871            general_pop_front_list::<i64>(array)
872        }
873        _ => exec_err!("array_pop_front does not support type: {}", array_data_type),
874    }
875}
876
877fn general_pop_front_list<O: OffsetSizeTrait>(
878    array: &GenericListArray<O>,
879) -> Result<ArrayRef>
880where
881    i64: TryInto<O>,
882{
883    let from_array = Int64Array::from(vec![2; array.len()]);
884    let to_array = Int64Array::from(
885        array
886            .iter()
887            .map(|arr| arr.map_or(0, |arr| arr.len() as i64))
888            .collect::<Vec<i64>>(),
889    );
890    general_array_slice::<O>(array, &from_array, &to_array, None)
891}
892
893#[user_doc(
894    doc_section(label = "Array Functions"),
895    description = "Returns the array without the last element.",
896    syntax_example = "array_pop_back(array)",
897    sql_example = r#"```sql
898> select array_pop_back([1, 2, 3]);
899+-------------------------------+
900| array_pop_back(List([1,2,3])) |
901+-------------------------------+
902| [1, 2]                        |
903+-------------------------------+
904```"#,
905    argument(
906        name = "array",
907        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
908    )
909)]
910#[derive(Debug, PartialEq, Eq, Hash)]
911pub(super) struct ArrayPopBack {
912    signature: Signature,
913    aliases: Vec<String>,
914}
915
916impl ArrayPopBack {
917    pub fn new() -> Self {
918        Self {
919            signature: Signature::array(Volatility::Immutable),
920            aliases: vec![String::from("list_pop_back")],
921        }
922    }
923}
924
925impl ScalarUDFImpl for ArrayPopBack {
926    fn as_any(&self) -> &dyn Any {
927        self
928    }
929    fn name(&self) -> &str {
930        "array_pop_back"
931    }
932
933    fn signature(&self) -> &Signature {
934        &self.signature
935    }
936
937    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
938        Ok(arg_types[0].clone())
939    }
940
941    fn invoke_with_args(
942        &self,
943        args: datafusion_expr::ScalarFunctionArgs,
944    ) -> Result<ColumnarValue> {
945        make_scalar_function(array_pop_back_inner)(&args.args)
946    }
947
948    fn aliases(&self) -> &[String] {
949        &self.aliases
950    }
951
952    fn documentation(&self) -> Option<&Documentation> {
953        self.doc()
954    }
955}
956
957/// array_pop_back SQL function
958fn array_pop_back_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
959    let [array] = take_function_args("array_pop_back", args)?;
960
961    match array.data_type() {
962        List(_) => {
963            let array = as_list_array(&array)?;
964            general_pop_back_list::<i32>(array)
965        }
966        LargeList(_) => {
967            let array = as_large_list_array(&array)?;
968            general_pop_back_list::<i64>(array)
969        }
970        _ => exec_err!(
971            "array_pop_back does not support type: {}",
972            array.data_type()
973        ),
974    }
975}
976
977fn general_pop_back_list<O: OffsetSizeTrait>(
978    array: &GenericListArray<O>,
979) -> Result<ArrayRef>
980where
981    i64: TryInto<O>,
982{
983    let from_array = Int64Array::from(vec![1; array.len()]);
984    let to_array = Int64Array::from(
985        array
986            .iter()
987            .map(|arr| arr.map_or(0, |arr| arr.len() as i64 - 1))
988            .collect::<Vec<i64>>(),
989    );
990    general_array_slice::<O>(array, &from_array, &to_array, None)
991}
992
993#[user_doc(
994    doc_section(label = "Array Functions"),
995    description = "Returns the first non-null element in the array.",
996    syntax_example = "array_any_value(array)",
997    sql_example = r#"```sql
998> select array_any_value([NULL, 1, 2, 3]);
999+-------------------------------+
1000| array_any_value(List([NULL,1,2,3])) |
1001+-------------------------------------+
1002| 1                                   |
1003+-------------------------------------+
1004```"#,
1005    argument(
1006        name = "array",
1007        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
1008    )
1009)]
1010#[derive(Debug, PartialEq, Eq, Hash)]
1011pub(super) struct ArrayAnyValue {
1012    signature: Signature,
1013    aliases: Vec<String>,
1014}
1015
1016impl ArrayAnyValue {
1017    pub fn new() -> Self {
1018        Self {
1019            signature: Signature::array(Volatility::Immutable),
1020            aliases: vec![String::from("list_any_value")],
1021        }
1022    }
1023}
1024
1025impl ScalarUDFImpl for ArrayAnyValue {
1026    fn as_any(&self) -> &dyn Any {
1027        self
1028    }
1029    fn name(&self) -> &str {
1030        "array_any_value"
1031    }
1032    fn signature(&self) -> &Signature {
1033        &self.signature
1034    }
1035    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
1036        match &arg_types[0] {
1037            List(field) | LargeList(field) | FixedSizeList(field, _) => {
1038                Ok(field.data_type().clone())
1039            }
1040            _ => plan_err!(
1041                "array_any_value can only accept List, LargeList or FixedSizeList as the argument"
1042            ),
1043        }
1044    }
1045
1046    fn invoke_with_args(
1047        &self,
1048        args: datafusion_expr::ScalarFunctionArgs,
1049    ) -> Result<ColumnarValue> {
1050        make_scalar_function(array_any_value_inner)(&args.args)
1051    }
1052
1053    fn aliases(&self) -> &[String] {
1054        &self.aliases
1055    }
1056
1057    fn documentation(&self) -> Option<&Documentation> {
1058        self.doc()
1059    }
1060}
1061
1062fn array_any_value_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
1063    let [array] = take_function_args("array_any_value", args)?;
1064
1065    match &array.data_type() {
1066        List(_) => {
1067            let array = as_list_array(&array)?;
1068            general_array_any_value::<i32>(array)
1069        }
1070        LargeList(_) => {
1071            let array = as_large_list_array(&array)?;
1072            general_array_any_value::<i64>(array)
1073        }
1074        data_type => exec_err!("array_any_value does not support type: {data_type}"),
1075    }
1076}
1077
1078fn general_array_any_value<O: OffsetSizeTrait>(
1079    array: &GenericListArray<O>,
1080) -> Result<ArrayRef>
1081where
1082    i64: TryInto<O>,
1083{
1084    let values = array.values();
1085    let original_data = values.to_data();
1086    let capacity = Capacities::Array(array.len());
1087
1088    let mut mutable =
1089        MutableArrayData::with_capacities(vec![&original_data], true, capacity);
1090
1091    for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
1092        let start = offset_window[0];
1093        let end = offset_window[1];
1094        let len = end - start;
1095
1096        // array is null
1097        if len == O::usize_as(0) {
1098            mutable.extend_nulls(1);
1099            continue;
1100        }
1101
1102        let row_value = array.value(row_index);
1103        match row_value.nulls() {
1104            Some(row_nulls_buffer) => {
1105                // nulls are present in the array so try to take the first valid element
1106                if let Some(first_non_null_index) =
1107                    row_nulls_buffer.valid_indices().next()
1108                {
1109                    let index = start.as_usize() + first_non_null_index;
1110                    mutable.extend(0, index, index + 1)
1111                } else {
1112                    // all the elements in the array are null
1113                    mutable.extend_nulls(1);
1114                }
1115            }
1116            None => {
1117                // no nulls are present in the array so take the first element
1118                let index = start.as_usize();
1119                mutable.extend(0, index, index + 1);
1120            }
1121        }
1122    }
1123
1124    let data = mutable.freeze();
1125    Ok(arrow::array::make_array(data))
1126}
1127
1128#[cfg(test)]
1129mod tests {
1130    use super::{array_element_udf, general_list_view_array_slice};
1131    use arrow::array::{
1132        Array, ArrayRef, GenericListViewArray, Int32Array, Int64Array, ListViewArray,
1133        cast::AsArray,
1134    };
1135    use arrow::buffer::ScalarBuffer;
1136    use arrow::datatypes::{DataType, Field};
1137    use datafusion_common::{Column, DFSchema, Result};
1138    use datafusion_expr::expr::ScalarFunction;
1139    use datafusion_expr::{Expr, ExprSchemable};
1140    use std::collections::HashMap;
1141    use std::sync::Arc;
1142
1143    fn list_view_values(array: &GenericListViewArray<i32>) -> Vec<Vec<i32>> {
1144        (0..array.len())
1145            .map(|i| {
1146                let child = array.value(i);
1147                let values = child.as_any().downcast_ref::<Int32Array>().unwrap();
1148                values.iter().map(|v| v.unwrap()).collect()
1149            })
1150            .collect()
1151    }
1152
1153    // Regression test for https://github.com/apache/datafusion/issues/13755
1154    #[test]
1155    fn test_array_element_return_type_fixed_size_list() {
1156        let fixed_size_list_type = DataType::FixedSizeList(
1157            Field::new("some_arbitrary_test_field", DataType::Int32, false).into(),
1158            13,
1159        );
1160        let array_type = DataType::List(
1161            Field::new_list_field(fixed_size_list_type.clone(), true).into(),
1162        );
1163        let index_type = DataType::Int64;
1164
1165        let schema = DFSchema::from_unqualified_fields(
1166            vec![
1167                Field::new("my_array", array_type.clone(), false),
1168                Field::new("my_index", index_type.clone(), false),
1169            ]
1170            .into(),
1171            HashMap::default(),
1172        )
1173        .unwrap();
1174
1175        let udf = array_element_udf();
1176
1177        // ScalarUDFImpl::return_type
1178        assert_eq!(
1179            udf.return_type(&[array_type.clone(), index_type.clone()])
1180                .unwrap(),
1181            fixed_size_list_type
1182        );
1183
1184        // Via ExprSchemable::get_type (e.g. SimplifyInfo)
1185        let udf_expr = Expr::ScalarFunction(ScalarFunction {
1186            func: array_element_udf(),
1187            args: vec![
1188                Expr::Column(Column::new_unqualified("my_array")),
1189                Expr::Column(Column::new_unqualified("my_index")),
1190            ],
1191        });
1192        assert_eq!(
1193            ExprSchemable::get_type(&udf_expr, &schema).unwrap(),
1194            fixed_size_list_type
1195        );
1196    }
1197
1198    #[test]
1199    fn test_array_slice_list_view_basic() -> Result<()> {
1200        let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
1201        let offsets = ScalarBuffer::from(vec![0, 3]);
1202        let sizes = ScalarBuffer::from(vec![3, 2]);
1203        let field = Arc::new(Field::new("item", DataType::Int32, true));
1204        let array = ListViewArray::new(field, offsets, sizes, values, None);
1205
1206        let from = Int64Array::from(vec![2, 1]);
1207        let to = Int64Array::from(vec![3, 2]);
1208
1209        let result = general_list_view_array_slice::<i32>(
1210            &array,
1211            &from,
1212            &to,
1213            None::<&Int64Array>,
1214        )?;
1215        let result = result.as_ref().as_list_view::<i32>();
1216
1217        assert_eq!(list_view_values(result), vec![vec![2, 3], vec![4, 5]]);
1218        Ok(())
1219    }
1220
1221    #[test]
1222    fn test_array_slice_list_view_non_monotonic_offsets() -> Result<()> {
1223        // First list references the tail of the values buffer, second list references the head.
1224        let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
1225        let offsets = ScalarBuffer::from(vec![3, 0]);
1226        let sizes = ScalarBuffer::from(vec![2, 3]);
1227        let field = Arc::new(Field::new("item", DataType::Int32, true));
1228        let array = ListViewArray::new(field, offsets, sizes, values, None);
1229
1230        let from = Int64Array::from(vec![1, 1]);
1231        let to = Int64Array::from(vec![2, 2]);
1232
1233        let result = general_list_view_array_slice::<i32>(
1234            &array,
1235            &from,
1236            &to,
1237            None::<&Int64Array>,
1238        )?;
1239        let result = result.as_ref().as_list_view::<i32>();
1240
1241        assert_eq!(list_view_values(result), vec![vec![4, 5], vec![1, 2]]);
1242        Ok(())
1243    }
1244
1245    #[test]
1246    fn test_array_slice_list_view_negative_stride() -> Result<()> {
1247        let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
1248        let offsets = ScalarBuffer::from(vec![0, 3]);
1249        let sizes = ScalarBuffer::from(vec![3, 2]);
1250        let field = Arc::new(Field::new("item", DataType::Int32, true));
1251        let array = ListViewArray::new(field, offsets, sizes, values, None);
1252
1253        let from = Int64Array::from(vec![3, 2]);
1254        let to = Int64Array::from(vec![1, 1]);
1255        let stride = Int64Array::from(vec![-1, -1]);
1256
1257        let result =
1258            general_list_view_array_slice::<i32>(&array, &from, &to, Some(&stride))?;
1259        let result = result.as_ref().as_list_view::<i32>();
1260
1261        assert_eq!(list_view_values(result), vec![vec![3, 2, 1], vec![5, 4]]);
1262        Ok(())
1263    }
1264
1265    #[test]
1266    fn test_array_slice_list_view_out_of_order() -> Result<()> {
1267        let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
1268        let offsets = ScalarBuffer::from(vec![3, 1, 0]);
1269        let sizes = ScalarBuffer::from(vec![2, 2, 1]);
1270        let field = Arc::new(Field::new("item", DataType::Int32, true));
1271        let array = ListViewArray::new(field, offsets, sizes, values, None);
1272        assert_eq!(
1273            list_view_values(&array),
1274            vec![vec![4, 5], vec![2, 3], vec![1]]
1275        );
1276
1277        let from = Int64Array::from(vec![2, 2, 2]);
1278        let to = Int64Array::from(vec![1, 1, 1]);
1279        let stride = Int64Array::from(vec![-1, -1, -1]);
1280
1281        let result =
1282            general_list_view_array_slice::<i32>(&array, &from, &to, Some(&stride))?;
1283        let result = result.as_ref().as_list_view::<i32>();
1284
1285        assert_eq!(
1286            list_view_values(result),
1287            vec![vec![5, 4], vec![3, 2], vec![]]
1288        );
1289        Ok(())
1290    }
1291
1292    #[test]
1293    fn test_array_slice_list_view_with_nulls() -> Result<()> {
1294        let values: ArrayRef = Arc::new(Int32Array::from(vec![
1295            Some(1),
1296            None,
1297            Some(3),
1298            Some(4),
1299            Some(5),
1300        ]));
1301        let offsets = ScalarBuffer::from(vec![0, 2, 5]);
1302        let sizes = ScalarBuffer::from(vec![2, 3, 0]);
1303        let field = Arc::new(Field::new("item", DataType::Int32, true));
1304        let array = ListViewArray::new(field, offsets, sizes, values, None);
1305
1306        let from = Int64Array::from(vec![1, 1, 1]);
1307        let to = Int64Array::from(vec![2, 2, 1]);
1308
1309        let result = general_list_view_array_slice::<i32>(&array, &from, &to, None)?;
1310        let result = result.as_ref().as_list_view::<i32>();
1311
1312        let actual: Vec<Vec<Option<i32>>> = (0..result.len())
1313            .map(|i| {
1314                result
1315                    .value(i)
1316                    .as_any()
1317                    .downcast_ref::<Int32Array>()
1318                    .unwrap()
1319                    .iter()
1320                    .collect()
1321            })
1322            .collect();
1323
1324        assert_eq!(
1325            actual,
1326            vec![vec![Some(1), None], vec![Some(3), Some(4)], Vec::new(),]
1327        );
1328
1329        // Test with NULL stride - should return NULL for rows with NULL stride
1330        let stride_with_null = Int64Array::from(vec![Some(1), None, Some(1)]);
1331        let result = general_list_view_array_slice::<i32>(
1332            &array,
1333            &from,
1334            &to,
1335            Some(&stride_with_null),
1336        )?;
1337        let result = result.as_ref().as_list_view::<i32>();
1338
1339        // First row: stride = 1, should return [1, None]
1340        // Second row: stride = NULL, should return NULL
1341        // Third row: stride = 1, empty array should return empty
1342        assert!(!result.is_null(0)); // First row should not be null
1343        assert!(result.is_null(1)); // Second row should be null (stride is NULL)
1344        assert!(!result.is_null(2)); // Third row should not be null
1345
1346        let first_row: Vec<Option<i32>> = result
1347            .value(0)
1348            .as_any()
1349            .downcast_ref::<Int32Array>()
1350            .unwrap()
1351            .iter()
1352            .collect();
1353        assert_eq!(first_row, vec![Some(1), None]);
1354
1355        Ok(())
1356    }
1357}