Skip to main content

datafusion_functions_nested/
extract.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ScalarUDFImpl`] definitions for array_element, array_slice, array_pop_front, array_pop_back, and array_any_value functions.
19
20use arrow::array::{
21    Array, ArrayRef, Capacities, GenericListArray, GenericListViewArray, Int64Array,
22    MutableArrayData, NullArray, OffsetSizeTrait,
23};
24use arrow::buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
25use arrow::datatypes::DataType;
26use arrow::datatypes::{
27    DataType::{FixedSizeList, LargeList, LargeListView, List, ListView, Null},
28    Field,
29};
30use datafusion_common::cast::as_large_list_array;
31use datafusion_common::cast::as_list_array;
32use datafusion_common::cast::{
33    as_int64_array, as_large_list_view_array, as_list_view_array,
34};
35use datafusion_common::internal_err;
36use datafusion_common::utils::ListCoercion;
37use datafusion_common::{
38    Result, exec_datafusion_err, exec_err, internal_datafusion_err, plan_err,
39    utils::take_function_args,
40};
41use datafusion_expr::{
42    ArrayFunctionArgument, ArrayFunctionSignature, Expr, ScalarFunctionArgs,
43    TypeSignature,
44};
45use datafusion_expr::{
46    ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
47};
48use datafusion_macros::user_doc;
49use std::sync::Arc;
50
51use crate::utils::make_scalar_function;
52
53// Create static instances of ScalarUDFs for each function
54make_udf_expr_and_func!(
55    ArrayElement,
56    array_element,
57    array element,
58    "extracts the element with the index n from the array.",
59    array_element_udf
60);
61
62create_func!(ArraySlice, array_slice_udf);
63
64make_udf_expr_and_func!(
65    ArrayPopFront,
66    array_pop_front,
67    array,
68    "returns the array without the first element.",
69    array_pop_front_udf
70);
71
72make_udf_expr_and_func!(
73    ArrayPopBack,
74    array_pop_back,
75    array,
76    "returns the array without the last element.",
77    array_pop_back_udf
78);
79
80make_udf_expr_and_func!(
81    ArrayAnyValue,
82    array_any_value,
83    array,
84    "returns the first non-null element in the array.",
85    array_any_value_udf
86);
87
88#[user_doc(
89    doc_section(label = "Array Functions"),
90    description = "Extracts the element with the index n from the array.",
91    syntax_example = "array_element(array, index)",
92    sql_example = r#"```sql
93> select array_element([1, 2, 3, 4], 3);
94+-----------------------------------------+
95| array_element(List([1,2,3,4]),Int64(3)) |
96+-----------------------------------------+
97| 3                                       |
98+-----------------------------------------+
99```"#,
100    argument(
101        name = "array",
102        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
103    ),
104    argument(
105        name = "index",
106        description = "Index to extract the element from the array."
107    )
108)]
109#[derive(Debug, PartialEq, Eq, Hash)]
110pub struct ArrayElement {
111    signature: Signature,
112    aliases: Vec<String>,
113}
114
115impl Default for ArrayElement {
116    fn default() -> Self {
117        Self::new()
118    }
119}
120
121impl ArrayElement {
122    pub fn new() -> Self {
123        Self {
124            signature: Signature::array_and_index(Volatility::Immutable),
125            aliases: vec![
126                String::from("array_extract"),
127                String::from("list_element"),
128                String::from("list_extract"),
129            ],
130        }
131    }
132}
133
134impl ScalarUDFImpl for ArrayElement {
135    fn name(&self) -> &str {
136        "array_element"
137    }
138
139    fn display_name(&self, args: &[Expr]) -> Result<String> {
140        let args_name = args.iter().map(ToString::to_string).collect::<Vec<_>>();
141        if args_name.len() != 2 {
142            return exec_err!("expect 2 args, got {}", args_name.len());
143        }
144
145        Ok(format!("{}[{}]", args_name[0], args_name[1]))
146    }
147
148    fn schema_name(&self, args: &[Expr]) -> Result<String> {
149        let args_name = args
150            .iter()
151            .map(|e| e.schema_name().to_string())
152            .collect::<Vec<_>>();
153        if args_name.len() != 2 {
154            return exec_err!("expect 2 args, got {}", args_name.len());
155        }
156
157        Ok(format!("{}[{}]", args_name[0], args_name[1]))
158    }
159
160    fn signature(&self) -> &Signature {
161        &self.signature
162    }
163
164    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
165        match &arg_types[0] {
166            Null => Ok(Null),
167            List(field) | LargeList(field) => Ok(field.data_type().clone()),
168            arg_type => plan_err!("{} does not support type {arg_type}", self.name()),
169        }
170    }
171
172    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
173        make_scalar_function(array_element_inner)(&args.args)
174    }
175
176    fn aliases(&self) -> &[String] {
177        &self.aliases
178    }
179
180    fn documentation(&self) -> Option<&Documentation> {
181        self.doc()
182    }
183}
184
185/// array_element SQL function
186///
187/// There are two arguments for array_element, the first one is the array, the second one is the 1-indexed index.
188/// `array_element(array, index)`
189///
190/// For example:
191/// > array_element(\[1, 2, 3], 2) -> 2
192fn array_element_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
193    let [array, indexes] = take_function_args("array_element", args)?;
194
195    match &array.data_type() {
196        Null => Ok(Arc::new(NullArray::new(array.len()))),
197        List(_) => {
198            let array = as_list_array(&array)?;
199            let indexes = as_int64_array(&indexes)?;
200            general_array_element::<i32>(array, indexes)
201        }
202        LargeList(_) => {
203            let array = as_large_list_array(&array)?;
204            let indexes = as_int64_array(&indexes)?;
205            general_array_element::<i64>(array, indexes)
206        }
207        arg_type => {
208            exec_err!("array_element does not support type {arg_type}")
209        }
210    }
211}
212
213fn general_array_element<O: OffsetSizeTrait>(
214    array: &GenericListArray<O>,
215    indexes: &Int64Array,
216) -> Result<ArrayRef>
217where
218    i64: TryInto<O>,
219{
220    let values = array.values();
221    if values.data_type().is_null() {
222        return Ok(Arc::new(NullArray::new(array.len())));
223    }
224
225    let original_data = values.to_data();
226    let capacity = Capacities::Array(original_data.len());
227
228    // use_nulls: true, we don't construct List for array_element, so we need explicit nulls.
229    let mut mutable =
230        MutableArrayData::with_capacities(vec![&original_data], true, capacity);
231
232    fn adjusted_array_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
233    where
234        i64: TryInto<O>,
235    {
236        let index: O = index.try_into().map_err(|_| {
237            exec_datafusion_err!("array_element got invalid index: {index}")
238        })?;
239        // 0 ~ len - 1
240        let adjusted_zero_index = if index < O::usize_as(0) {
241            index + len
242        } else {
243            index - O::usize_as(1)
244        };
245
246        if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
247            Ok(Some(adjusted_zero_index))
248        } else {
249            // Out of bounds
250            Ok(None)
251        }
252    }
253
254    for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
255        let start = offset_window[0];
256        let end = offset_window[1];
257        let len = end - start;
258
259        // array is null
260        if array.is_null(row_index) {
261            mutable.extend_nulls(1);
262            continue;
263        }
264
265        let index = adjusted_array_index::<O>(indexes.value(row_index), len)?;
266
267        if let Some(index) = index {
268            let start = start.as_usize() + index.as_usize();
269            mutable.extend(0, start, start + 1_usize);
270        } else {
271            // Index out of bounds
272            mutable.extend_nulls(1);
273        }
274    }
275
276    let data = mutable.freeze();
277    Ok(arrow::array::make_array(data))
278}
279
280#[doc = "returns a slice of the array."]
281pub fn array_slice(array: Expr, begin: Expr, end: Expr, stride: Option<Expr>) -> Expr {
282    let args = match stride {
283        Some(stride) => vec![array, begin, end, stride],
284        None => vec![array, begin, end],
285    };
286    array_slice_udf().call(args)
287}
288
289#[user_doc(
290    doc_section(label = "Array Functions"),
291    description = "Returns a slice of the array based on 1-indexed start and end positions.",
292    syntax_example = "array_slice(array, begin, end)",
293    sql_example = r#"```sql
294> select array_slice([1, 2, 3, 4, 5, 6, 7, 8], 3, 6);
295+--------------------------------------------------------+
296| array_slice(List([1,2,3,4,5,6,7,8]),Int64(3),Int64(6)) |
297+--------------------------------------------------------+
298| [3, 4, 5, 6]                                           |
299+--------------------------------------------------------+
300```"#,
301    argument(
302        name = "array",
303        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
304    ),
305    argument(
306        name = "begin",
307        description = "Index of the first element. If negative, it counts backward from the end of the array."
308    ),
309    argument(
310        name = "end",
311        description = "Index of the last element. If negative, it counts backward from the end of the array."
312    ),
313    argument(
314        name = "stride",
315        description = "Stride of the array slice. The default is 1."
316    )
317)]
318#[derive(Debug, PartialEq, Eq, Hash)]
319pub(super) struct ArraySlice {
320    signature: Signature,
321    aliases: Vec<String>,
322}
323
324impl ArraySlice {
325    pub fn new() -> Self {
326        Self {
327            signature: Signature::one_of(
328                vec![
329                    TypeSignature::ArraySignature(ArrayFunctionSignature::Array {
330                        arguments: vec![
331                            ArrayFunctionArgument::Array,
332                            ArrayFunctionArgument::Index,
333                            ArrayFunctionArgument::Index,
334                        ],
335                        array_coercion: Some(ListCoercion::FixedSizedListToList),
336                    }),
337                    TypeSignature::ArraySignature(ArrayFunctionSignature::Array {
338                        arguments: vec![
339                            ArrayFunctionArgument::Array,
340                            ArrayFunctionArgument::Index,
341                            ArrayFunctionArgument::Index,
342                            ArrayFunctionArgument::Index,
343                        ],
344                        array_coercion: Some(ListCoercion::FixedSizedListToList),
345                    }),
346                ],
347                Volatility::Immutable,
348            ),
349            aliases: vec![String::from("list_slice")],
350        }
351    }
352}
353
354impl ScalarUDFImpl for ArraySlice {
355    fn display_name(&self, args: &[Expr]) -> Result<String> {
356        let args_name = args.iter().map(ToString::to_string).collect::<Vec<_>>();
357        if let Some((arr, indexes)) = args_name.split_first() {
358            Ok(format!("{arr}[{}]", indexes.join(":")))
359        } else {
360            exec_err!("no argument")
361        }
362    }
363
364    fn schema_name(&self, args: &[Expr]) -> Result<String> {
365        let args_name = args
366            .iter()
367            .map(|e| e.schema_name().to_string())
368            .collect::<Vec<_>>();
369        if let Some((arr, indexes)) = args_name.split_first() {
370            Ok(format!("{arr}[{}]", indexes.join(":")))
371        } else {
372            exec_err!("no argument")
373        }
374    }
375
376    fn name(&self) -> &str {
377        "array_slice"
378    }
379
380    fn signature(&self) -> &Signature {
381        &self.signature
382    }
383
384    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
385        Ok(arg_types[0].clone())
386    }
387
388    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
389        make_scalar_function(array_slice_inner)(&args.args)
390    }
391
392    fn aliases(&self) -> &[String] {
393        &self.aliases
394    }
395
396    fn documentation(&self) -> Option<&Documentation> {
397        self.doc()
398    }
399}
400
401/// array_slice SQL function
402///
403/// We follow the behavior of array_slice in DuckDB
404/// Note that array_slice is 1-indexed. And there are two additional arguments `from` and `to` in array_slice.
405///
406/// > array_slice(array, from, to)
407///
408/// Positive index is treated as the index from the start of the array. If the
409/// `from` index is smaller than 1, it is treated as 1. If the `to` index is larger than the
410/// length of the array, it is treated as the length of the array.
411///
412/// Negative index is treated as the index from the end of the array. If the index
413/// is larger than the length of the array, it is NOT VALID, either in `from` or `to`.
414/// The `to` index is exclusive like python slice syntax.
415///
416/// See test cases in `array.slt` for more details.
417fn array_slice_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
418    let args_len = args.len();
419    if args_len != 3 && args_len != 4 {
420        return exec_err!("array_slice needs three or four arguments");
421    }
422
423    let stride = if args_len == 4 {
424        Some(as_int64_array(&args[3])?)
425    } else {
426        None
427    };
428
429    let from_array = as_int64_array(&args[1])?;
430    let to_array = as_int64_array(&args[2])?;
431
432    let array_data_type = args[0].data_type();
433    match array_data_type {
434        List(_) => {
435            let array = as_list_array(&args[0])?;
436            general_array_slice::<i32>(array, from_array, to_array, stride)
437        }
438        LargeList(_) => {
439            let array = as_large_list_array(&args[0])?;
440            general_array_slice::<i64>(array, from_array, to_array, stride)
441        }
442        ListView(_) => {
443            let array = as_list_view_array(&args[0])?;
444            general_list_view_array_slice::<i32>(array, from_array, to_array, stride)
445        }
446        LargeListView(_) => {
447            let array = as_large_list_view_array(&args[0])?;
448            general_list_view_array_slice::<i64>(array, from_array, to_array, stride)
449        }
450        _ => exec_err!("array_slice does not support type: {}", array_data_type),
451    }
452}
453
454fn adjusted_from_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
455where
456    i64: TryInto<O>,
457{
458    // 0 ~ len - 1
459    let adjusted_zero_index = if index < 0 {
460        if let Ok(index) = index.try_into() {
461            // When index < 0 and -index > length, index is clamped to the beginning of the list.
462            // Otherwise, when index < 0, the index is counted from the end of the list.
463            //
464            // Note, we actually test the contrapositive, index < -length, because negating a
465            // negative will panic if the negative is equal to the smallest representable value
466            // while negating a positive is always safe.
467            if index < (O::zero() - O::one()) * len {
468                O::zero()
469            } else {
470                index + len
471            }
472        } else {
473            return exec_err!("array_slice got invalid index: {}", index);
474        }
475    } else {
476        // array_slice(arr, 1, to) is the same as array_slice(arr, 0, to)
477        if let Ok(index) = index.try_into() {
478            std::cmp::max(index - O::usize_as(1), O::usize_as(0))
479        } else {
480            return exec_err!("array_slice got invalid index: {}", index);
481        }
482    };
483
484    if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
485        Ok(Some(adjusted_zero_index))
486    } else {
487        // Out of bounds
488        Ok(None)
489    }
490}
491
492fn adjusted_to_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
493where
494    i64: TryInto<O>,
495{
496    // 0 ~ len - 1
497    let adjusted_zero_index = if index < 0 {
498        // array_slice in duckdb with negative to_index is python-like, so index itself is exclusive
499        if let Ok(index) = index.try_into() {
500            index + len
501        } else {
502            return exec_err!("array_slice got invalid index: {}", index);
503        }
504    } else {
505        // array_slice(arr, from, len + 1) is the same as array_slice(arr, from, len)
506        if let Ok(index) = index.try_into() {
507            std::cmp::min(index - O::usize_as(1), len - O::usize_as(1))
508        } else {
509            return exec_err!("array_slice got invalid index: {}", index);
510        }
511    };
512
513    if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
514        Ok(Some(adjusted_zero_index))
515    } else {
516        // Out of bounds
517        Ok(None)
518    }
519}
520
521/// Internal plan describing how to materialize a single row's slice after
522/// the slice bounds/stride have been normalized. Both list layouts consume
523/// this to drive their copy logic.
524enum SlicePlan<O: OffsetSizeTrait> {
525    /// No values should be produced.
526    Empty,
527    /// A contiguous run starting at `start` (relative to the row) with `len`
528    /// elements can be copied in one go.
529    Contiguous { start: O, len: O },
530    /// Arbitrary positions (already relative to the row) must be copied in
531    /// sequence.
532    Indices(Vec<O>),
533}
534
535/// Produces a [`SlicePlan`] for the given logical slice parameters.
536fn compute_slice_plan<O: OffsetSizeTrait>(
537    len: O,
538    from_raw: i64,
539    to_raw: i64,
540    stride_raw: Option<i64>,
541) -> Result<SlicePlan<O>>
542where
543    i64: TryInto<O>,
544{
545    if len == O::usize_as(0) {
546        return Ok(SlicePlan::Empty);
547    }
548
549    let from_index = adjusted_from_index::<O>(from_raw, len)?;
550    let to_index = adjusted_to_index::<O>(to_raw, len)?;
551
552    let (Some(from), Some(to)) = (from_index, to_index) else {
553        return Ok(SlicePlan::Empty);
554    };
555
556    let stride_value = stride_raw.unwrap_or(1);
557    if stride_value == 0 {
558        return exec_err!(
559            "array_slice got invalid stride: {:?}, it cannot be 0",
560            stride_value
561        );
562    }
563
564    if (from < to && stride_value.is_negative())
565        || (from > to && stride_value.is_positive())
566    {
567        return Ok(SlicePlan::Empty);
568    }
569
570    let stride: O = stride_value.try_into().map_err(|_| {
571        internal_datafusion_err!("array_slice got invalid stride: {}", stride_value)
572    })?;
573
574    if from <= to && stride_value.is_positive() {
575        if stride_value == 1 {
576            let len = to - from + O::usize_as(1);
577            Ok(SlicePlan::Contiguous { start: from, len })
578        } else {
579            let mut indices = Vec::new();
580            let mut index = from;
581            while index <= to {
582                indices.push(index);
583                index += stride;
584            }
585            Ok(SlicePlan::Indices(indices))
586        }
587    } else {
588        let mut indices = Vec::new();
589        let mut index = from;
590        while index >= to {
591            indices.push(index);
592            index += stride;
593        }
594        Ok(SlicePlan::Indices(indices))
595    }
596}
597
598/// Combine null bitmaps from all slice inputs into a single mask.
599fn combine_input_nulls(
600    array: &dyn Array,
601    from_array: &Int64Array,
602    to_array: &Int64Array,
603    stride: Option<&Int64Array>,
604) -> Option<NullBuffer> {
605    NullBuffer::union_many([
606        array.nulls(),
607        from_array.nulls(),
608        to_array.nulls(),
609        stride.and_then(|s| s.nulls()),
610    ])
611}
612
613fn general_array_slice<O: OffsetSizeTrait>(
614    array: &GenericListArray<O>,
615    from_array: &Int64Array,
616    to_array: &Int64Array,
617    stride: Option<&Int64Array>,
618) -> Result<ArrayRef>
619where
620    i64: TryInto<O>,
621{
622    let values = array.values();
623    let original_data = values.to_data();
624    let capacity = Capacities::Array(original_data.len());
625
626    let mut mutable =
627        MutableArrayData::with_capacities(vec![&original_data], true, capacity);
628
629    // We have the slice syntax compatible with DuckDB v0.8.1.
630    // The rule `adjusted_from_index` and `adjusted_to_index` follows the rule of array_slice in duckdb.
631
632    let mut offsets = vec![O::usize_as(0)];
633
634    let nulls = combine_input_nulls(array, from_array, to_array, stride);
635
636    for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
637        let start = offset_window[0];
638        let end = offset_window[1];
639        let len = end - start;
640
641        if nulls.as_ref().is_some_and(|n| n.is_null(row_index)) {
642            mutable.extend_nulls(1);
643            offsets.push(offsets[row_index] + O::usize_as(1));
644            continue;
645        }
646
647        // Empty arrays always return an empty array.
648        if len == O::usize_as(0) {
649            offsets.push(offsets[row_index]);
650            continue;
651        }
652
653        let slice_plan = compute_slice_plan::<O>(
654            len,
655            from_array.value(row_index),
656            to_array.value(row_index),
657            stride.map(|s| s.value(row_index)),
658        )?;
659
660        match slice_plan {
661            SlicePlan::Empty => offsets.push(offsets[row_index]),
662            SlicePlan::Contiguous {
663                start: rel_start,
664                len: slice_len,
665            } => {
666                let start_index = (start + rel_start).to_usize().unwrap();
667                let end_index = (start + rel_start + slice_len).to_usize().unwrap();
668                mutable.extend(0, start_index, end_index);
669                offsets.push(offsets[row_index] + slice_len);
670            }
671            SlicePlan::Indices(indices) => {
672                let count = indices.len();
673                for rel_index in indices {
674                    let absolute_index = (start + rel_index).to_usize().unwrap();
675                    mutable.extend(0, absolute_index, absolute_index + 1);
676                }
677                offsets.push(offsets[row_index] + O::usize_as(count));
678            }
679        }
680    }
681
682    let data = mutable.freeze();
683
684    Ok(Arc::new(GenericListArray::<O>::try_new(
685        Arc::new(Field::new_list_field(array.value_type(), true)),
686        OffsetBuffer::<O>::new(offsets.into()),
687        arrow::array::make_array(data),
688        nulls,
689    )?))
690}
691
692fn general_list_view_array_slice<O: OffsetSizeTrait>(
693    array: &GenericListViewArray<O>,
694    from_array: &Int64Array,
695    to_array: &Int64Array,
696    stride: Option<&Int64Array>,
697) -> Result<ArrayRef>
698where
699    i64: TryInto<O>,
700{
701    let values = array.values();
702    let original_data = values.to_data();
703    let capacity = Capacities::Array(original_data.len());
704    let field = match array.data_type() {
705        ListView(field) | LargeListView(field) => Arc::clone(field),
706        other => {
707            return internal_err!("array_slice got unexpected data type: {}", other);
708        }
709    };
710
711    let mut mutable =
712        MutableArrayData::with_capacities(vec![&original_data], true, capacity);
713
714    // We must build `offsets` and `sizes` buffers manually as ListView does not enforce
715    // monotonically increasing offsets.
716    let mut offsets = Vec::with_capacity(array.len());
717    let mut sizes = Vec::with_capacity(array.len());
718    let mut current_offset = O::usize_as(0);
719
720    let nulls = combine_input_nulls(array, from_array, to_array, stride);
721
722    for row_index in 0..array.len() {
723        if nulls.as_ref().is_some_and(|n| n.is_null(row_index)) {
724            offsets.push(current_offset);
725            sizes.push(O::usize_as(0));
726            continue;
727        }
728
729        let len = array.value_size(row_index);
730
731        // Empty arrays always return an empty array.
732        if len == O::usize_as(0) {
733            offsets.push(current_offset);
734            sizes.push(O::usize_as(0));
735            continue;
736        }
737
738        let slice_plan = compute_slice_plan::<O>(
739            len,
740            from_array.value(row_index),
741            to_array.value(row_index),
742            stride.map(|s| s.value(row_index)),
743        )?;
744
745        let start = array.value_offset(row_index);
746        match slice_plan {
747            SlicePlan::Empty => {
748                offsets.push(current_offset);
749                sizes.push(O::usize_as(0));
750            }
751            SlicePlan::Contiguous {
752                start: rel_start,
753                len: slice_len,
754            } => {
755                let start_index = (start + rel_start).to_usize().unwrap();
756                let end_index = (start + rel_start + slice_len).to_usize().unwrap();
757                mutable.extend(0, start_index, end_index);
758                offsets.push(current_offset);
759                sizes.push(slice_len);
760                current_offset += slice_len;
761            }
762            SlicePlan::Indices(indices) => {
763                let count = indices.len();
764                for rel_index in indices {
765                    let absolute_index = (start + rel_index).to_usize().unwrap();
766                    mutable.extend(0, absolute_index, absolute_index + 1);
767                }
768                let length = O::usize_as(count);
769                offsets.push(current_offset);
770                sizes.push(length);
771                current_offset += length;
772            }
773        }
774    }
775
776    let data = mutable.freeze();
777
778    Ok(Arc::new(GenericListViewArray::<O>::try_new(
779        field,
780        ScalarBuffer::from(offsets),
781        ScalarBuffer::from(sizes),
782        arrow::array::make_array(data),
783        nulls,
784    )?))
785}
786
787#[user_doc(
788    doc_section(label = "Array Functions"),
789    description = "Returns the array without the first element.",
790    syntax_example = "array_pop_front(array)",
791    sql_example = r#"```sql
792> select array_pop_front([1, 2, 3]);
793+-------------------------------+
794| array_pop_front(List([1,2,3])) |
795+-------------------------------+
796| [2, 3]                        |
797+-------------------------------+
798```"#,
799    argument(
800        name = "array",
801        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
802    )
803)]
804#[derive(Debug, PartialEq, Eq, Hash)]
805pub(super) struct ArrayPopFront {
806    signature: Signature,
807    aliases: Vec<String>,
808}
809
810impl ArrayPopFront {
811    pub fn new() -> Self {
812        Self {
813            signature: Signature::array(Volatility::Immutable),
814            aliases: vec![String::from("list_pop_front")],
815        }
816    }
817}
818
819impl ScalarUDFImpl for ArrayPopFront {
820    fn name(&self) -> &str {
821        "array_pop_front"
822    }
823
824    fn signature(&self) -> &Signature {
825        &self.signature
826    }
827
828    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
829        Ok(arg_types[0].clone())
830    }
831
832    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
833        make_scalar_function(array_pop_front_inner)(&args.args)
834    }
835
836    fn aliases(&self) -> &[String] {
837        &self.aliases
838    }
839
840    fn documentation(&self) -> Option<&Documentation> {
841        self.doc()
842    }
843}
844
845/// array_pop_front SQL function
846fn array_pop_front_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
847    let array_data_type = args[0].data_type();
848    match array_data_type {
849        List(_) => {
850            let array = as_list_array(&args[0])?;
851            general_pop_front_list::<i32>(array)
852        }
853        LargeList(_) => {
854            let array = as_large_list_array(&args[0])?;
855            general_pop_front_list::<i64>(array)
856        }
857        _ => exec_err!("array_pop_front does not support type: {}", array_data_type),
858    }
859}
860
861fn general_pop_front_list<O: OffsetSizeTrait>(
862    array: &GenericListArray<O>,
863) -> Result<ArrayRef>
864where
865    i64: TryInto<O>,
866{
867    let from_array = Int64Array::from(vec![2; array.len()]);
868    let to_array = Int64Array::from(
869        array
870            .iter()
871            .map(|arr| arr.map_or(0, |arr| arr.len() as i64))
872            .collect::<Vec<i64>>(),
873    );
874    general_array_slice::<O>(array, &from_array, &to_array, None)
875}
876
877#[user_doc(
878    doc_section(label = "Array Functions"),
879    description = "Returns the array without the last element.",
880    syntax_example = "array_pop_back(array)",
881    sql_example = r#"```sql
882> select array_pop_back([1, 2, 3]);
883+-------------------------------+
884| array_pop_back(List([1,2,3])) |
885+-------------------------------+
886| [1, 2]                        |
887+-------------------------------+
888```"#,
889    argument(
890        name = "array",
891        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
892    )
893)]
894#[derive(Debug, PartialEq, Eq, Hash)]
895pub(super) struct ArrayPopBack {
896    signature: Signature,
897    aliases: Vec<String>,
898}
899
900impl ArrayPopBack {
901    pub fn new() -> Self {
902        Self {
903            signature: Signature::array(Volatility::Immutable),
904            aliases: vec![String::from("list_pop_back")],
905        }
906    }
907}
908
909impl ScalarUDFImpl for ArrayPopBack {
910    fn name(&self) -> &str {
911        "array_pop_back"
912    }
913
914    fn signature(&self) -> &Signature {
915        &self.signature
916    }
917
918    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
919        Ok(arg_types[0].clone())
920    }
921
922    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
923        make_scalar_function(array_pop_back_inner)(&args.args)
924    }
925
926    fn aliases(&self) -> &[String] {
927        &self.aliases
928    }
929
930    fn documentation(&self) -> Option<&Documentation> {
931        self.doc()
932    }
933}
934
935/// array_pop_back SQL function
936fn array_pop_back_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
937    let [array] = take_function_args("array_pop_back", args)?;
938
939    match array.data_type() {
940        List(_) => {
941            let array = as_list_array(&array)?;
942            general_pop_back_list::<i32>(array)
943        }
944        LargeList(_) => {
945            let array = as_large_list_array(&array)?;
946            general_pop_back_list::<i64>(array)
947        }
948        _ => exec_err!(
949            "array_pop_back does not support type: {}",
950            array.data_type()
951        ),
952    }
953}
954
955fn general_pop_back_list<O: OffsetSizeTrait>(
956    array: &GenericListArray<O>,
957) -> Result<ArrayRef>
958where
959    i64: TryInto<O>,
960{
961    let from_array = Int64Array::from(vec![1; array.len()]);
962    let to_array = Int64Array::from(
963        array
964            .iter()
965            .map(|arr| arr.map_or(0, |arr| arr.len() as i64 - 1))
966            .collect::<Vec<i64>>(),
967    );
968    general_array_slice::<O>(array, &from_array, &to_array, None)
969}
970
971#[user_doc(
972    doc_section(label = "Array Functions"),
973    description = "Returns the first non-null element in the array.",
974    syntax_example = "array_any_value(array)",
975    sql_example = r#"```sql
976> select array_any_value([NULL, 1, 2, 3]);
977+-------------------------------+
978| array_any_value(List([NULL,1,2,3])) |
979+-------------------------------------+
980| 1                                   |
981+-------------------------------------+
982```"#,
983    argument(
984        name = "array",
985        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
986    )
987)]
988#[derive(Debug, PartialEq, Eq, Hash)]
989pub(super) struct ArrayAnyValue {
990    signature: Signature,
991    aliases: Vec<String>,
992}
993
994impl ArrayAnyValue {
995    pub fn new() -> Self {
996        Self {
997            signature: Signature::array(Volatility::Immutable),
998            aliases: vec![String::from("list_any_value")],
999        }
1000    }
1001}
1002
1003impl ScalarUDFImpl for ArrayAnyValue {
1004    fn name(&self) -> &str {
1005        "array_any_value"
1006    }
1007    fn signature(&self) -> &Signature {
1008        &self.signature
1009    }
1010    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
1011        match &arg_types[0] {
1012            List(field) | LargeList(field) | FixedSizeList(field, _) => {
1013                Ok(field.data_type().clone())
1014            }
1015            _ => plan_err!(
1016                "array_any_value can only accept List, LargeList or FixedSizeList as the argument"
1017            ),
1018        }
1019    }
1020
1021    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
1022        make_scalar_function(array_any_value_inner)(&args.args)
1023    }
1024
1025    fn aliases(&self) -> &[String] {
1026        &self.aliases
1027    }
1028
1029    fn documentation(&self) -> Option<&Documentation> {
1030        self.doc()
1031    }
1032}
1033
1034fn array_any_value_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
1035    let [array] = take_function_args("array_any_value", args)?;
1036
1037    match &array.data_type() {
1038        List(_) => {
1039            let array = as_list_array(&array)?;
1040            general_array_any_value::<i32>(array)
1041        }
1042        LargeList(_) => {
1043            let array = as_large_list_array(&array)?;
1044            general_array_any_value::<i64>(array)
1045        }
1046        data_type => exec_err!("array_any_value does not support type: {data_type}"),
1047    }
1048}
1049
1050fn general_array_any_value<O: OffsetSizeTrait>(
1051    array: &GenericListArray<O>,
1052) -> Result<ArrayRef>
1053where
1054    i64: TryInto<O>,
1055{
1056    let values = array.values();
1057    let original_data = values.to_data();
1058    let capacity = Capacities::Array(array.len());
1059
1060    let mut mutable =
1061        MutableArrayData::with_capacities(vec![&original_data], true, capacity);
1062
1063    for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
1064        let start = offset_window[0];
1065
1066        // array is null
1067        if array.is_null(row_index) {
1068            mutable.extend_nulls(1);
1069            continue;
1070        }
1071
1072        let row_value = array.value(row_index);
1073        match row_value.nulls() {
1074            Some(row_nulls_buffer) => {
1075                // nulls are present in the array so try to take the first valid element
1076                if let Some(first_non_null_index) =
1077                    row_nulls_buffer.valid_indices().next()
1078                {
1079                    let index = start.as_usize() + first_non_null_index;
1080                    mutable.extend(0, index, index + 1)
1081                } else {
1082                    // all the elements in the array are null
1083                    mutable.extend_nulls(1);
1084                }
1085            }
1086            None => {
1087                // no nulls are present in the array so take the first element
1088                let index = start.as_usize();
1089                mutable.extend(0, index, index + 1);
1090            }
1091        }
1092    }
1093
1094    let data = mutable.freeze();
1095    Ok(arrow::array::make_array(data))
1096}
1097
1098#[cfg(test)]
1099mod tests {
1100    use super::{
1101        array_element_udf, general_array_any_value, general_array_element,
1102        general_list_view_array_slice,
1103    };
1104    use arrow::array::{
1105        Array, ArrayRef, GenericListViewArray, Int32Array, Int64Array, ListViewArray,
1106        cast::AsArray,
1107    };
1108    use arrow::array::{ListArray, RecordBatch};
1109    use arrow::buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
1110    use arrow::datatypes::{DataType, Field};
1111    use datafusion_common::{Column, DFSchema, Result, assert_batches_eq};
1112    use datafusion_expr::expr::ScalarFunction;
1113    use datafusion_expr::{Expr, ExprSchemable};
1114    use std::collections::HashMap;
1115    use std::sync::Arc;
1116
1117    fn list_view_values(array: &GenericListViewArray<i32>) -> Vec<Vec<i32>> {
1118        (0..array.len())
1119            .map(|i| {
1120                let child = array.value(i);
1121                let values = child.as_any().downcast_ref::<Int32Array>().unwrap();
1122                values.iter().map(|v| v.unwrap()).collect()
1123            })
1124            .collect()
1125    }
1126
1127    // Regression test for https://github.com/apache/datafusion/issues/13755
1128    #[test]
1129    fn test_array_element_return_type_fixed_size_list() {
1130        let fixed_size_list_type = DataType::FixedSizeList(
1131            Field::new("some_arbitrary_test_field", DataType::Int32, false).into(),
1132            13,
1133        );
1134        let array_type = DataType::List(
1135            Field::new_list_field(fixed_size_list_type.clone(), true).into(),
1136        );
1137        let index_type = DataType::Int64;
1138
1139        let schema = DFSchema::from_unqualified_fields(
1140            vec![
1141                Field::new("my_array", array_type.clone(), false),
1142                Field::new("my_index", index_type.clone(), false),
1143            ]
1144            .into(),
1145            HashMap::default(),
1146        )
1147        .unwrap();
1148
1149        let udf = array_element_udf();
1150
1151        // ScalarUDFImpl::return_type
1152        assert_eq!(
1153            udf.return_type(&[array_type.clone(), index_type.clone()])
1154                .unwrap(),
1155            fixed_size_list_type
1156        );
1157
1158        // Via ExprSchemable::get_type (e.g. SimplifyInfo)
1159        let udf_expr = Expr::ScalarFunction(ScalarFunction {
1160            func: array_element_udf(),
1161            args: vec![
1162                Expr::Column(Column::new_unqualified("my_array")),
1163                Expr::Column(Column::new_unqualified("my_index")),
1164            ],
1165        });
1166        assert_eq!(
1167            ExprSchemable::get_type(&udf_expr, &schema).unwrap(),
1168            fixed_size_list_type
1169        );
1170    }
1171
1172    #[test]
1173    fn test_array_element_null_handling() -> Result<()> {
1174        let values = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
1175        let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 3, 4, 5]));
1176        let nulls = NullBuffer::from(vec![true, false, true]);
1177        let field = Arc::new(Field::new("item", DataType::Int32, true));
1178
1179        let list_array = ListArray::new(field, offsets, values, Some(nulls));
1180        let indexes = Int64Array::from(vec![1, 1, 1]);
1181
1182        let result = general_array_element(&list_array, &indexes)?;
1183
1184        let expected = [
1185            "+--------+",
1186            "| result |",
1187            "+--------+",
1188            "| 1      |",
1189            "|        |",
1190            "| 5      |",
1191            "+--------+",
1192        ];
1193
1194        let batch = RecordBatch::try_from_iter([("result", result)])?;
1195
1196        assert_batches_eq!(expected, &[batch]);
1197
1198        Ok(())
1199    }
1200
1201    #[test]
1202    fn test_array_any_null_handling() -> Result<()> {
1203        let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
1204        let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 3, 4, 5]));
1205        let nulls = NullBuffer::from(vec![true, false, true]);
1206        let field = Arc::new(Field::new("item", DataType::Int32, true));
1207
1208        let list_array = ListArray::new(field, offsets, values, Some(nulls));
1209
1210        let result = general_array_any_value(&list_array)?;
1211
1212        assert!(!result.is_null(0));
1213        assert!(result.is_null(1));
1214        assert!(!result.is_null(2));
1215
1216        Ok(())
1217    }
1218
1219    #[test]
1220    fn test_array_slice_list_view_basic() -> Result<()> {
1221        let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
1222        let offsets = ScalarBuffer::from(vec![0, 3]);
1223        let sizes = ScalarBuffer::from(vec![3, 2]);
1224        let field = Arc::new(Field::new("item", DataType::Int32, true));
1225        let array = ListViewArray::new(field, offsets, sizes, values, None);
1226
1227        let from = Int64Array::from(vec![2, 1]);
1228        let to = Int64Array::from(vec![3, 2]);
1229
1230        let result = general_list_view_array_slice::<i32>(
1231            &array,
1232            &from,
1233            &to,
1234            None::<&Int64Array>,
1235        )?;
1236        let result = result.as_ref().as_list_view::<i32>();
1237
1238        assert_eq!(list_view_values(result), vec![vec![2, 3], vec![4, 5]]);
1239        Ok(())
1240    }
1241
1242    #[test]
1243    fn test_array_slice_list_view_non_monotonic_offsets() -> Result<()> {
1244        // First list references the tail of the values buffer, second list references the head.
1245        let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
1246        let offsets = ScalarBuffer::from(vec![3, 0]);
1247        let sizes = ScalarBuffer::from(vec![2, 3]);
1248        let field = Arc::new(Field::new("item", DataType::Int32, true));
1249        let array = ListViewArray::new(field, offsets, sizes, values, None);
1250
1251        let from = Int64Array::from(vec![1, 1]);
1252        let to = Int64Array::from(vec![2, 2]);
1253
1254        let result = general_list_view_array_slice::<i32>(
1255            &array,
1256            &from,
1257            &to,
1258            None::<&Int64Array>,
1259        )?;
1260        let result = result.as_ref().as_list_view::<i32>();
1261
1262        assert_eq!(list_view_values(result), vec![vec![4, 5], vec![1, 2]]);
1263        Ok(())
1264    }
1265
1266    #[test]
1267    fn test_array_slice_list_view_negative_stride() -> Result<()> {
1268        let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
1269        let offsets = ScalarBuffer::from(vec![0, 3]);
1270        let sizes = ScalarBuffer::from(vec![3, 2]);
1271        let field = Arc::new(Field::new("item", DataType::Int32, true));
1272        let array = ListViewArray::new(field, offsets, sizes, values, None);
1273
1274        let from = Int64Array::from(vec![3, 2]);
1275        let to = Int64Array::from(vec![1, 1]);
1276        let stride = Int64Array::from(vec![-1, -1]);
1277
1278        let result =
1279            general_list_view_array_slice::<i32>(&array, &from, &to, Some(&stride))?;
1280        let result = result.as_ref().as_list_view::<i32>();
1281
1282        assert_eq!(list_view_values(result), vec![vec![3, 2, 1], vec![5, 4]]);
1283        Ok(())
1284    }
1285
1286    #[test]
1287    fn test_array_slice_list_view_out_of_order() -> Result<()> {
1288        let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
1289        let offsets = ScalarBuffer::from(vec![3, 1, 0]);
1290        let sizes = ScalarBuffer::from(vec![2, 2, 1]);
1291        let field = Arc::new(Field::new("item", DataType::Int32, true));
1292        let array = ListViewArray::new(field, offsets, sizes, values, None);
1293        assert_eq!(
1294            list_view_values(&array),
1295            vec![vec![4, 5], vec![2, 3], vec![1]]
1296        );
1297
1298        let from = Int64Array::from(vec![2, 2, 2]);
1299        let to = Int64Array::from(vec![1, 1, 1]);
1300        let stride = Int64Array::from(vec![-1, -1, -1]);
1301
1302        let result =
1303            general_list_view_array_slice::<i32>(&array, &from, &to, Some(&stride))?;
1304        let result = result.as_ref().as_list_view::<i32>();
1305
1306        assert_eq!(
1307            list_view_values(result),
1308            vec![vec![5, 4], vec![3, 2], vec![]]
1309        );
1310        Ok(())
1311    }
1312
1313    #[test]
1314    fn test_array_slice_list_view_with_nulls() -> Result<()> {
1315        let values: ArrayRef = Arc::new(Int32Array::from(vec![
1316            Some(1),
1317            None,
1318            Some(3),
1319            Some(4),
1320            Some(5),
1321        ]));
1322        let offsets = ScalarBuffer::from(vec![0, 2, 5]);
1323        let sizes = ScalarBuffer::from(vec![2, 3, 0]);
1324        let field = Arc::new(Field::new("item", DataType::Int32, true));
1325        let array = ListViewArray::new(field, offsets, sizes, values, None);
1326
1327        let from = Int64Array::from(vec![1, 1, 1]);
1328        let to = Int64Array::from(vec![2, 2, 1]);
1329
1330        let result = general_list_view_array_slice::<i32>(&array, &from, &to, None)?;
1331        let result = result.as_ref().as_list_view::<i32>();
1332
1333        let actual: Vec<Vec<Option<i32>>> = (0..result.len())
1334            .map(|i| {
1335                result
1336                    .value(i)
1337                    .as_any()
1338                    .downcast_ref::<Int32Array>()
1339                    .unwrap()
1340                    .iter()
1341                    .collect()
1342            })
1343            .collect();
1344
1345        assert_eq!(
1346            actual,
1347            vec![vec![Some(1), None], vec![Some(3), Some(4)], Vec::new(),]
1348        );
1349
1350        // Test with NULL stride - should return NULL for rows with NULL stride
1351        let stride_with_null = Int64Array::from(vec![Some(1), None, Some(1)]);
1352        let result = general_list_view_array_slice::<i32>(
1353            &array,
1354            &from,
1355            &to,
1356            Some(&stride_with_null),
1357        )?;
1358        let result = result.as_ref().as_list_view::<i32>();
1359
1360        // First row: stride = 1, should return [1, None]
1361        // Second row: stride = NULL, should return NULL
1362        // Third row: stride = 1, empty array should return empty
1363        assert!(!result.is_null(0)); // First row should not be null
1364        assert!(result.is_null(1)); // Second row should be null (stride is NULL)
1365        assert!(!result.is_null(2)); // Third row should not be null
1366
1367        let first_row: Vec<Option<i32>> = result
1368            .value(0)
1369            .as_any()
1370            .downcast_ref::<Int32Array>()
1371            .unwrap()
1372            .iter()
1373            .collect();
1374        assert_eq!(first_row, vec![Some(1), None]);
1375
1376        Ok(())
1377    }
1378}