1use arrow::array::{
21 Array, ArrayRef, Capacities, GenericListArray, GenericListViewArray, Int64Array,
22 MutableArrayData, NullArray, NullBufferBuilder, OffsetSizeTrait,
23};
24use arrow::buffer::{OffsetBuffer, ScalarBuffer};
25use arrow::datatypes::DataType;
26use arrow::datatypes::{
27 DataType::{FixedSizeList, LargeList, LargeListView, List, ListView, Null},
28 Field,
29};
30use datafusion_common::cast::as_large_list_array;
31use datafusion_common::cast::as_list_array;
32use datafusion_common::cast::{
33 as_int64_array, as_large_list_view_array, as_list_view_array,
34};
35use datafusion_common::internal_err;
36use datafusion_common::utils::ListCoercion;
37use datafusion_common::{
38 Result, exec_datafusion_err, exec_err, internal_datafusion_err, plan_err,
39 utils::take_function_args,
40};
41use datafusion_expr::{
42 ArrayFunctionArgument, ArrayFunctionSignature, Expr, TypeSignature,
43};
44use datafusion_expr::{
45 ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
46};
47use datafusion_macros::user_doc;
48use std::any::Any;
49use std::sync::Arc;
50
51use crate::utils::make_scalar_function;
52
53make_udf_expr_and_func!(
55 ArrayElement,
56 array_element,
57 array element,
58 "extracts the element with the index n from the array.",
59 array_element_udf
60);
61
62create_func!(ArraySlice, array_slice_udf);
63
64make_udf_expr_and_func!(
65 ArrayPopFront,
66 array_pop_front,
67 array,
68 "returns the array without the first element.",
69 array_pop_front_udf
70);
71
72make_udf_expr_and_func!(
73 ArrayPopBack,
74 array_pop_back,
75 array,
76 "returns the array without the last element.",
77 array_pop_back_udf
78);
79
80make_udf_expr_and_func!(
81 ArrayAnyValue,
82 array_any_value,
83 array,
84 "returns the first non-null element in the array.",
85 array_any_value_udf
86);
87
88#[user_doc(
89 doc_section(label = "Array Functions"),
90 description = "Extracts the element with the index n from the array.",
91 syntax_example = "array_element(array, index)",
92 sql_example = r#"```sql
93> select array_element([1, 2, 3, 4], 3);
94+-----------------------------------------+
95| array_element(List([1,2,3,4]),Int64(3)) |
96+-----------------------------------------+
97| 3 |
98+-----------------------------------------+
99```"#,
100 argument(
101 name = "array",
102 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
103 ),
104 argument(
105 name = "index",
106 description = "Index to extract the element from the array."
107 )
108)]
109#[derive(Debug, PartialEq, Eq, Hash)]
110pub struct ArrayElement {
111 signature: Signature,
112 aliases: Vec<String>,
113}
114
115impl Default for ArrayElement {
116 fn default() -> Self {
117 Self::new()
118 }
119}
120
121impl ArrayElement {
122 pub fn new() -> Self {
123 Self {
124 signature: Signature::array_and_index(Volatility::Immutable),
125 aliases: vec![
126 String::from("array_extract"),
127 String::from("list_element"),
128 String::from("list_extract"),
129 ],
130 }
131 }
132}
133
134impl ScalarUDFImpl for ArrayElement {
135 fn as_any(&self) -> &dyn Any {
136 self
137 }
138 fn name(&self) -> &str {
139 "array_element"
140 }
141
142 fn display_name(&self, args: &[Expr]) -> Result<String> {
143 let args_name = args.iter().map(ToString::to_string).collect::<Vec<_>>();
144 if args_name.len() != 2 {
145 return exec_err!("expect 2 args, got {}", args_name.len());
146 }
147
148 Ok(format!("{}[{}]", args_name[0], args_name[1]))
149 }
150
151 fn schema_name(&self, args: &[Expr]) -> Result<String> {
152 let args_name = args
153 .iter()
154 .map(|e| e.schema_name().to_string())
155 .collect::<Vec<_>>();
156 if args_name.len() != 2 {
157 return exec_err!("expect 2 args, got {}", args_name.len());
158 }
159
160 Ok(format!("{}[{}]", args_name[0], args_name[1]))
161 }
162
163 fn signature(&self) -> &Signature {
164 &self.signature
165 }
166
167 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
168 match &arg_types[0] {
169 Null => Ok(Null),
170 List(field) | LargeList(field) => Ok(field.data_type().clone()),
171 arg_type => plan_err!("{} does not support type {arg_type}", self.name()),
172 }
173 }
174
175 fn invoke_with_args(
176 &self,
177 args: datafusion_expr::ScalarFunctionArgs,
178 ) -> Result<ColumnarValue> {
179 make_scalar_function(array_element_inner)(&args.args)
180 }
181
182 fn aliases(&self) -> &[String] {
183 &self.aliases
184 }
185
186 fn documentation(&self) -> Option<&Documentation> {
187 self.doc()
188 }
189}
190
191fn array_element_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
199 let [array, indexes] = take_function_args("array_element", args)?;
200
201 match &array.data_type() {
202 Null => Ok(Arc::new(NullArray::new(array.len()))),
203 List(_) => {
204 let array = as_list_array(&array)?;
205 let indexes = as_int64_array(&indexes)?;
206 general_array_element::<i32>(array, indexes)
207 }
208 LargeList(_) => {
209 let array = as_large_list_array(&array)?;
210 let indexes = as_int64_array(&indexes)?;
211 general_array_element::<i64>(array, indexes)
212 }
213 arg_type => {
214 exec_err!("array_element does not support type {arg_type}")
215 }
216 }
217}
218
219fn general_array_element<O: OffsetSizeTrait>(
220 array: &GenericListArray<O>,
221 indexes: &Int64Array,
222) -> Result<ArrayRef>
223where
224 i64: TryInto<O>,
225{
226 let values = array.values();
227 if values.data_type().is_null() {
228 return Ok(Arc::new(NullArray::new(array.len())));
229 }
230
231 let original_data = values.to_data();
232 let capacity = Capacities::Array(original_data.len());
233
234 let mut mutable =
236 MutableArrayData::with_capacities(vec![&original_data], true, capacity);
237
238 fn adjusted_array_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
239 where
240 i64: TryInto<O>,
241 {
242 let index: O = index.try_into().map_err(|_| {
243 exec_datafusion_err!("array_element got invalid index: {index}")
244 })?;
245 let adjusted_zero_index = if index < O::usize_as(0) {
247 index + len
248 } else {
249 index - O::usize_as(1)
250 };
251
252 if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
253 Ok(Some(adjusted_zero_index))
254 } else {
255 Ok(None)
257 }
258 }
259
260 for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
261 let start = offset_window[0];
262 let end = offset_window[1];
263 let len = end - start;
264
265 if len == O::usize_as(0) {
267 mutable.extend_nulls(1);
268 continue;
269 }
270
271 let index = adjusted_array_index::<O>(indexes.value(row_index), len)?;
272
273 if let Some(index) = index {
274 let start = start.as_usize() + index.as_usize();
275 mutable.extend(0, start, start + 1_usize);
276 } else {
277 mutable.extend_nulls(1);
279 }
280 }
281
282 let data = mutable.freeze();
283 Ok(arrow::array::make_array(data))
284}
285
286#[doc = "returns a slice of the array."]
287pub fn array_slice(array: Expr, begin: Expr, end: Expr, stride: Option<Expr>) -> Expr {
288 let args = match stride {
289 Some(stride) => vec![array, begin, end, stride],
290 None => vec![array, begin, end],
291 };
292 array_slice_udf().call(args)
293}
294
295#[user_doc(
296 doc_section(label = "Array Functions"),
297 description = "Returns a slice of the array based on 1-indexed start and end positions.",
298 syntax_example = "array_slice(array, begin, end)",
299 sql_example = r#"```sql
300> select array_slice([1, 2, 3, 4, 5, 6, 7, 8], 3, 6);
301+--------------------------------------------------------+
302| array_slice(List([1,2,3,4,5,6,7,8]),Int64(3),Int64(6)) |
303+--------------------------------------------------------+
304| [3, 4, 5, 6] |
305+--------------------------------------------------------+
306```"#,
307 argument(
308 name = "array",
309 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
310 ),
311 argument(
312 name = "begin",
313 description = "Index of the first element. If negative, it counts backward from the end of the array."
314 ),
315 argument(
316 name = "end",
317 description = "Index of the last element. If negative, it counts backward from the end of the array."
318 ),
319 argument(
320 name = "stride",
321 description = "Stride of the array slice. The default is 1."
322 )
323)]
324#[derive(Debug, PartialEq, Eq, Hash)]
325pub(super) struct ArraySlice {
326 signature: Signature,
327 aliases: Vec<String>,
328}
329
330impl ArraySlice {
331 pub fn new() -> Self {
332 Self {
333 signature: Signature::one_of(
334 vec![
335 TypeSignature::ArraySignature(ArrayFunctionSignature::Array {
336 arguments: vec![
337 ArrayFunctionArgument::Array,
338 ArrayFunctionArgument::Index,
339 ArrayFunctionArgument::Index,
340 ],
341 array_coercion: Some(ListCoercion::FixedSizedListToList),
342 }),
343 TypeSignature::ArraySignature(ArrayFunctionSignature::Array {
344 arguments: vec![
345 ArrayFunctionArgument::Array,
346 ArrayFunctionArgument::Index,
347 ArrayFunctionArgument::Index,
348 ArrayFunctionArgument::Index,
349 ],
350 array_coercion: Some(ListCoercion::FixedSizedListToList),
351 }),
352 ],
353 Volatility::Immutable,
354 ),
355 aliases: vec![String::from("list_slice")],
356 }
357 }
358}
359
360impl ScalarUDFImpl for ArraySlice {
361 fn as_any(&self) -> &dyn Any {
362 self
363 }
364
365 fn display_name(&self, args: &[Expr]) -> Result<String> {
366 let args_name = args.iter().map(ToString::to_string).collect::<Vec<_>>();
367 if let Some((arr, indexes)) = args_name.split_first() {
368 Ok(format!("{arr}[{}]", indexes.join(":")))
369 } else {
370 exec_err!("no argument")
371 }
372 }
373
374 fn schema_name(&self, args: &[Expr]) -> Result<String> {
375 let args_name = args
376 .iter()
377 .map(|e| e.schema_name().to_string())
378 .collect::<Vec<_>>();
379 if let Some((arr, indexes)) = args_name.split_first() {
380 Ok(format!("{arr}[{}]", indexes.join(":")))
381 } else {
382 exec_err!("no argument")
383 }
384 }
385
386 fn name(&self) -> &str {
387 "array_slice"
388 }
389
390 fn signature(&self) -> &Signature {
391 &self.signature
392 }
393
394 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
395 Ok(arg_types[0].clone())
396 }
397
398 fn invoke_with_args(
399 &self,
400 args: datafusion_expr::ScalarFunctionArgs,
401 ) -> Result<ColumnarValue> {
402 make_scalar_function(array_slice_inner)(&args.args)
403 }
404
405 fn aliases(&self) -> &[String] {
406 &self.aliases
407 }
408
409 fn documentation(&self) -> Option<&Documentation> {
410 self.doc()
411 }
412}
413
414fn array_slice_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
431 let args_len = args.len();
432 if args_len != 3 && args_len != 4 {
433 return exec_err!("array_slice needs three or four arguments");
434 }
435
436 let stride = if args_len == 4 {
437 Some(as_int64_array(&args[3])?)
438 } else {
439 None
440 };
441
442 let from_array = as_int64_array(&args[1])?;
443 let to_array = as_int64_array(&args[2])?;
444
445 let array_data_type = args[0].data_type();
446 match array_data_type {
447 List(_) => {
448 let array = as_list_array(&args[0])?;
449 general_array_slice::<i32>(array, from_array, to_array, stride)
450 }
451 LargeList(_) => {
452 let array = as_large_list_array(&args[0])?;
453 general_array_slice::<i64>(array, from_array, to_array, stride)
454 }
455 ListView(_) => {
456 let array = as_list_view_array(&args[0])?;
457 general_list_view_array_slice::<i32>(array, from_array, to_array, stride)
458 }
459 LargeListView(_) => {
460 let array = as_large_list_view_array(&args[0])?;
461 general_list_view_array_slice::<i64>(array, from_array, to_array, stride)
462 }
463 _ => exec_err!("array_slice does not support type: {}", array_data_type),
464 }
465}
466
467fn adjusted_from_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
468where
469 i64: TryInto<O>,
470{
471 let adjusted_zero_index = if index < 0 {
473 if let Ok(index) = index.try_into() {
474 if index < (O::zero() - O::one()) * len {
481 O::zero()
482 } else {
483 index + len
484 }
485 } else {
486 return exec_err!("array_slice got invalid index: {}", index);
487 }
488 } else {
489 if let Ok(index) = index.try_into() {
491 std::cmp::max(index - O::usize_as(1), O::usize_as(0))
492 } else {
493 return exec_err!("array_slice got invalid index: {}", index);
494 }
495 };
496
497 if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
498 Ok(Some(adjusted_zero_index))
499 } else {
500 Ok(None)
502 }
503}
504
505fn adjusted_to_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
506where
507 i64: TryInto<O>,
508{
509 let adjusted_zero_index = if index < 0 {
511 if let Ok(index) = index.try_into() {
513 index + len
514 } else {
515 return exec_err!("array_slice got invalid index: {}", index);
516 }
517 } else {
518 if let Ok(index) = index.try_into() {
520 std::cmp::min(index - O::usize_as(1), len - O::usize_as(1))
521 } else {
522 return exec_err!("array_slice got invalid index: {}", index);
523 }
524 };
525
526 if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
527 Ok(Some(adjusted_zero_index))
528 } else {
529 Ok(None)
531 }
532}
533
534enum SlicePlan<O: OffsetSizeTrait> {
538 Empty,
540 Contiguous { start: O, len: O },
543 Indices(Vec<O>),
546}
547
548fn compute_slice_plan<O: OffsetSizeTrait>(
550 len: O,
551 from_raw: i64,
552 to_raw: i64,
553 stride_raw: Option<i64>,
554) -> Result<SlicePlan<O>>
555where
556 i64: TryInto<O>,
557{
558 if len == O::usize_as(0) {
559 return Ok(SlicePlan::Empty);
560 }
561
562 let from_index = adjusted_from_index::<O>(from_raw, len)?;
563 let to_index = adjusted_to_index::<O>(to_raw, len)?;
564
565 let (Some(from), Some(to)) = (from_index, to_index) else {
566 return Ok(SlicePlan::Empty);
567 };
568
569 let stride_value = stride_raw.unwrap_or(1);
570 if stride_value == 0 {
571 return exec_err!(
572 "array_slice got invalid stride: {:?}, it cannot be 0",
573 stride_value
574 );
575 }
576
577 if (from < to && stride_value.is_negative())
578 || (from > to && stride_value.is_positive())
579 {
580 return Ok(SlicePlan::Empty);
581 }
582
583 let stride: O = stride_value.try_into().map_err(|_| {
584 internal_datafusion_err!("array_slice got invalid stride: {}", stride_value)
585 })?;
586
587 if from <= to && stride_value.is_positive() {
588 if stride_value == 1 {
589 let len = to - from + O::usize_as(1);
590 Ok(SlicePlan::Contiguous { start: from, len })
591 } else {
592 let mut indices = Vec::new();
593 let mut index = from;
594 while index <= to {
595 indices.push(index);
596 index += stride;
597 }
598 Ok(SlicePlan::Indices(indices))
599 }
600 } else {
601 let mut indices = Vec::new();
602 let mut index = from;
603 while index >= to {
604 indices.push(index);
605 index += stride;
606 }
607 Ok(SlicePlan::Indices(indices))
608 }
609}
610
611fn general_array_slice<O: OffsetSizeTrait>(
612 array: &GenericListArray<O>,
613 from_array: &Int64Array,
614 to_array: &Int64Array,
615 stride: Option<&Int64Array>,
616) -> Result<ArrayRef>
617where
618 i64: TryInto<O>,
619{
620 let values = array.values();
621 let original_data = values.to_data();
622 let capacity = Capacities::Array(original_data.len());
623
624 let mut mutable =
625 MutableArrayData::with_capacities(vec![&original_data], true, capacity);
626
627 let mut offsets = vec![O::usize_as(0)];
631 let mut null_builder = NullBufferBuilder::new(array.len());
632
633 for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
634 let start = offset_window[0];
635 let end = offset_window[1];
636 let len = end - start;
637
638 if array.is_null(row_index)
640 || from_array.is_null(row_index)
641 || to_array.is_null(row_index)
642 || stride.is_some_and(|s| s.is_null(row_index))
643 {
644 mutable.extend_nulls(1);
645 offsets.push(offsets[row_index] + O::usize_as(1));
646 null_builder.append_null();
647 continue;
648 }
649 null_builder.append_non_null();
650
651 if len == O::usize_as(0) {
653 offsets.push(offsets[row_index]);
654 continue;
655 }
656
657 let slice_plan = compute_slice_plan::<O>(
658 len,
659 from_array.value(row_index),
660 to_array.value(row_index),
661 stride.map(|s| s.value(row_index)),
662 )?;
663
664 match slice_plan {
665 SlicePlan::Empty => offsets.push(offsets[row_index]),
666 SlicePlan::Contiguous {
667 start: rel_start,
668 len: slice_len,
669 } => {
670 let start_index = (start + rel_start).to_usize().unwrap();
671 let end_index = (start + rel_start + slice_len).to_usize().unwrap();
672 mutable.extend(0, start_index, end_index);
673 offsets.push(offsets[row_index] + slice_len);
674 }
675 SlicePlan::Indices(indices) => {
676 let count = indices.len();
677 for rel_index in indices {
678 let absolute_index = (start + rel_index).to_usize().unwrap();
679 mutable.extend(0, absolute_index, absolute_index + 1);
680 }
681 offsets.push(offsets[row_index] + O::usize_as(count));
682 }
683 }
684 }
685
686 let data = mutable.freeze();
687
688 Ok(Arc::new(GenericListArray::<O>::try_new(
689 Arc::new(Field::new_list_field(array.value_type(), true)),
690 OffsetBuffer::<O>::new(offsets.into()),
691 arrow::array::make_array(data),
692 null_builder.finish(),
693 )?))
694}
695
696fn general_list_view_array_slice<O: OffsetSizeTrait>(
697 array: &GenericListViewArray<O>,
698 from_array: &Int64Array,
699 to_array: &Int64Array,
700 stride: Option<&Int64Array>,
701) -> Result<ArrayRef>
702where
703 i64: TryInto<O>,
704{
705 let values = array.values();
706 let original_data = values.to_data();
707 let capacity = Capacities::Array(original_data.len());
708 let field = match array.data_type() {
709 ListView(field) | LargeListView(field) => Arc::clone(field),
710 other => {
711 return internal_err!("array_slice got unexpected data type: {}", other);
712 }
713 };
714
715 let mut mutable =
716 MutableArrayData::with_capacities(vec![&original_data], true, capacity);
717
718 let mut offsets = Vec::with_capacity(array.len());
721 let mut sizes = Vec::with_capacity(array.len());
722 let mut current_offset = O::usize_as(0);
723 let mut null_builder = NullBufferBuilder::new(array.len());
724
725 for row_index in 0..array.len() {
726 if array.is_null(row_index)
728 || from_array.is_null(row_index)
729 || to_array.is_null(row_index)
730 || stride.is_some_and(|s| s.is_null(row_index))
731 {
732 null_builder.append_null();
733 offsets.push(current_offset);
734 sizes.push(O::usize_as(0));
735 continue;
736 }
737 null_builder.append_non_null();
738
739 let len = array.value_size(row_index);
740
741 if len == O::usize_as(0) {
743 offsets.push(current_offset);
744 sizes.push(O::usize_as(0));
745 continue;
746 }
747
748 let slice_plan = compute_slice_plan::<O>(
749 len,
750 from_array.value(row_index),
751 to_array.value(row_index),
752 stride.map(|s| s.value(row_index)),
753 )?;
754
755 let start = array.value_offset(row_index);
756 match slice_plan {
757 SlicePlan::Empty => {
758 offsets.push(current_offset);
759 sizes.push(O::usize_as(0));
760 }
761 SlicePlan::Contiguous {
762 start: rel_start,
763 len: slice_len,
764 } => {
765 let start_index = (start + rel_start).to_usize().unwrap();
766 let end_index = (start + rel_start + slice_len).to_usize().unwrap();
767 mutable.extend(0, start_index, end_index);
768 offsets.push(current_offset);
769 sizes.push(slice_len);
770 current_offset += slice_len;
771 }
772 SlicePlan::Indices(indices) => {
773 let count = indices.len();
774 for rel_index in indices {
775 let absolute_index = (start + rel_index).to_usize().unwrap();
776 mutable.extend(0, absolute_index, absolute_index + 1);
777 }
778 let length = O::usize_as(count);
779 offsets.push(current_offset);
780 sizes.push(length);
781 current_offset += length;
782 }
783 }
784 }
785
786 let data = mutable.freeze();
787
788 Ok(Arc::new(GenericListViewArray::<O>::try_new(
789 field,
790 ScalarBuffer::from(offsets),
791 ScalarBuffer::from(sizes),
792 arrow::array::make_array(data),
793 null_builder.finish(),
794 )?))
795}
796
797#[user_doc(
798 doc_section(label = "Array Functions"),
799 description = "Returns the array without the first element.",
800 syntax_example = "array_pop_front(array)",
801 sql_example = r#"```sql
802> select array_pop_front([1, 2, 3]);
803+-------------------------------+
804| array_pop_front(List([1,2,3])) |
805+-------------------------------+
806| [2, 3] |
807+-------------------------------+
808```"#,
809 argument(
810 name = "array",
811 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
812 )
813)]
814#[derive(Debug, PartialEq, Eq, Hash)]
815pub(super) struct ArrayPopFront {
816 signature: Signature,
817 aliases: Vec<String>,
818}
819
820impl ArrayPopFront {
821 pub fn new() -> Self {
822 Self {
823 signature: Signature::array(Volatility::Immutable),
824 aliases: vec![String::from("list_pop_front")],
825 }
826 }
827}
828
829impl ScalarUDFImpl for ArrayPopFront {
830 fn as_any(&self) -> &dyn Any {
831 self
832 }
833 fn name(&self) -> &str {
834 "array_pop_front"
835 }
836
837 fn signature(&self) -> &Signature {
838 &self.signature
839 }
840
841 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
842 Ok(arg_types[0].clone())
843 }
844
845 fn invoke_with_args(
846 &self,
847 args: datafusion_expr::ScalarFunctionArgs,
848 ) -> Result<ColumnarValue> {
849 make_scalar_function(array_pop_front_inner)(&args.args)
850 }
851
852 fn aliases(&self) -> &[String] {
853 &self.aliases
854 }
855
856 fn documentation(&self) -> Option<&Documentation> {
857 self.doc()
858 }
859}
860
861fn array_pop_front_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
863 let array_data_type = args[0].data_type();
864 match array_data_type {
865 List(_) => {
866 let array = as_list_array(&args[0])?;
867 general_pop_front_list::<i32>(array)
868 }
869 LargeList(_) => {
870 let array = as_large_list_array(&args[0])?;
871 general_pop_front_list::<i64>(array)
872 }
873 _ => exec_err!("array_pop_front does not support type: {}", array_data_type),
874 }
875}
876
877fn general_pop_front_list<O: OffsetSizeTrait>(
878 array: &GenericListArray<O>,
879) -> Result<ArrayRef>
880where
881 i64: TryInto<O>,
882{
883 let from_array = Int64Array::from(vec![2; array.len()]);
884 let to_array = Int64Array::from(
885 array
886 .iter()
887 .map(|arr| arr.map_or(0, |arr| arr.len() as i64))
888 .collect::<Vec<i64>>(),
889 );
890 general_array_slice::<O>(array, &from_array, &to_array, None)
891}
892
893#[user_doc(
894 doc_section(label = "Array Functions"),
895 description = "Returns the array without the last element.",
896 syntax_example = "array_pop_back(array)",
897 sql_example = r#"```sql
898> select array_pop_back([1, 2, 3]);
899+-------------------------------+
900| array_pop_back(List([1,2,3])) |
901+-------------------------------+
902| [1, 2] |
903+-------------------------------+
904```"#,
905 argument(
906 name = "array",
907 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
908 )
909)]
910#[derive(Debug, PartialEq, Eq, Hash)]
911pub(super) struct ArrayPopBack {
912 signature: Signature,
913 aliases: Vec<String>,
914}
915
916impl ArrayPopBack {
917 pub fn new() -> Self {
918 Self {
919 signature: Signature::array(Volatility::Immutable),
920 aliases: vec![String::from("list_pop_back")],
921 }
922 }
923}
924
925impl ScalarUDFImpl for ArrayPopBack {
926 fn as_any(&self) -> &dyn Any {
927 self
928 }
929 fn name(&self) -> &str {
930 "array_pop_back"
931 }
932
933 fn signature(&self) -> &Signature {
934 &self.signature
935 }
936
937 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
938 Ok(arg_types[0].clone())
939 }
940
941 fn invoke_with_args(
942 &self,
943 args: datafusion_expr::ScalarFunctionArgs,
944 ) -> Result<ColumnarValue> {
945 make_scalar_function(array_pop_back_inner)(&args.args)
946 }
947
948 fn aliases(&self) -> &[String] {
949 &self.aliases
950 }
951
952 fn documentation(&self) -> Option<&Documentation> {
953 self.doc()
954 }
955}
956
957fn array_pop_back_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
959 let [array] = take_function_args("array_pop_back", args)?;
960
961 match array.data_type() {
962 List(_) => {
963 let array = as_list_array(&array)?;
964 general_pop_back_list::<i32>(array)
965 }
966 LargeList(_) => {
967 let array = as_large_list_array(&array)?;
968 general_pop_back_list::<i64>(array)
969 }
970 _ => exec_err!(
971 "array_pop_back does not support type: {}",
972 array.data_type()
973 ),
974 }
975}
976
977fn general_pop_back_list<O: OffsetSizeTrait>(
978 array: &GenericListArray<O>,
979) -> Result<ArrayRef>
980where
981 i64: TryInto<O>,
982{
983 let from_array = Int64Array::from(vec![1; array.len()]);
984 let to_array = Int64Array::from(
985 array
986 .iter()
987 .map(|arr| arr.map_or(0, |arr| arr.len() as i64 - 1))
988 .collect::<Vec<i64>>(),
989 );
990 general_array_slice::<O>(array, &from_array, &to_array, None)
991}
992
993#[user_doc(
994 doc_section(label = "Array Functions"),
995 description = "Returns the first non-null element in the array.",
996 syntax_example = "array_any_value(array)",
997 sql_example = r#"```sql
998> select array_any_value([NULL, 1, 2, 3]);
999+-------------------------------+
1000| array_any_value(List([NULL,1,2,3])) |
1001+-------------------------------------+
1002| 1 |
1003+-------------------------------------+
1004```"#,
1005 argument(
1006 name = "array",
1007 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
1008 )
1009)]
1010#[derive(Debug, PartialEq, Eq, Hash)]
1011pub(super) struct ArrayAnyValue {
1012 signature: Signature,
1013 aliases: Vec<String>,
1014}
1015
1016impl ArrayAnyValue {
1017 pub fn new() -> Self {
1018 Self {
1019 signature: Signature::array(Volatility::Immutable),
1020 aliases: vec![String::from("list_any_value")],
1021 }
1022 }
1023}
1024
1025impl ScalarUDFImpl for ArrayAnyValue {
1026 fn as_any(&self) -> &dyn Any {
1027 self
1028 }
1029 fn name(&self) -> &str {
1030 "array_any_value"
1031 }
1032 fn signature(&self) -> &Signature {
1033 &self.signature
1034 }
1035 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
1036 match &arg_types[0] {
1037 List(field) | LargeList(field) | FixedSizeList(field, _) => {
1038 Ok(field.data_type().clone())
1039 }
1040 _ => plan_err!(
1041 "array_any_value can only accept List, LargeList or FixedSizeList as the argument"
1042 ),
1043 }
1044 }
1045
1046 fn invoke_with_args(
1047 &self,
1048 args: datafusion_expr::ScalarFunctionArgs,
1049 ) -> Result<ColumnarValue> {
1050 make_scalar_function(array_any_value_inner)(&args.args)
1051 }
1052
1053 fn aliases(&self) -> &[String] {
1054 &self.aliases
1055 }
1056
1057 fn documentation(&self) -> Option<&Documentation> {
1058 self.doc()
1059 }
1060}
1061
1062fn array_any_value_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
1063 let [array] = take_function_args("array_any_value", args)?;
1064
1065 match &array.data_type() {
1066 List(_) => {
1067 let array = as_list_array(&array)?;
1068 general_array_any_value::<i32>(array)
1069 }
1070 LargeList(_) => {
1071 let array = as_large_list_array(&array)?;
1072 general_array_any_value::<i64>(array)
1073 }
1074 data_type => exec_err!("array_any_value does not support type: {data_type}"),
1075 }
1076}
1077
1078fn general_array_any_value<O: OffsetSizeTrait>(
1079 array: &GenericListArray<O>,
1080) -> Result<ArrayRef>
1081where
1082 i64: TryInto<O>,
1083{
1084 let values = array.values();
1085 let original_data = values.to_data();
1086 let capacity = Capacities::Array(array.len());
1087
1088 let mut mutable =
1089 MutableArrayData::with_capacities(vec![&original_data], true, capacity);
1090
1091 for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
1092 let start = offset_window[0];
1093 let end = offset_window[1];
1094 let len = end - start;
1095
1096 if len == O::usize_as(0) {
1098 mutable.extend_nulls(1);
1099 continue;
1100 }
1101
1102 let row_value = array.value(row_index);
1103 match row_value.nulls() {
1104 Some(row_nulls_buffer) => {
1105 if let Some(first_non_null_index) =
1107 row_nulls_buffer.valid_indices().next()
1108 {
1109 let index = start.as_usize() + first_non_null_index;
1110 mutable.extend(0, index, index + 1)
1111 } else {
1112 mutable.extend_nulls(1);
1114 }
1115 }
1116 None => {
1117 let index = start.as_usize();
1119 mutable.extend(0, index, index + 1);
1120 }
1121 }
1122 }
1123
1124 let data = mutable.freeze();
1125 Ok(arrow::array::make_array(data))
1126}
1127
1128#[cfg(test)]
1129mod tests {
1130 use super::{array_element_udf, general_list_view_array_slice};
1131 use arrow::array::{
1132 Array, ArrayRef, GenericListViewArray, Int32Array, Int64Array, ListViewArray,
1133 cast::AsArray,
1134 };
1135 use arrow::buffer::ScalarBuffer;
1136 use arrow::datatypes::{DataType, Field};
1137 use datafusion_common::{Column, DFSchema, Result};
1138 use datafusion_expr::expr::ScalarFunction;
1139 use datafusion_expr::{Expr, ExprSchemable};
1140 use std::collections::HashMap;
1141 use std::sync::Arc;
1142
1143 fn list_view_values(array: &GenericListViewArray<i32>) -> Vec<Vec<i32>> {
1144 (0..array.len())
1145 .map(|i| {
1146 let child = array.value(i);
1147 let values = child.as_any().downcast_ref::<Int32Array>().unwrap();
1148 values.iter().map(|v| v.unwrap()).collect()
1149 })
1150 .collect()
1151 }
1152
1153 #[test]
1155 fn test_array_element_return_type_fixed_size_list() {
1156 let fixed_size_list_type = DataType::FixedSizeList(
1157 Field::new("some_arbitrary_test_field", DataType::Int32, false).into(),
1158 13,
1159 );
1160 let array_type = DataType::List(
1161 Field::new_list_field(fixed_size_list_type.clone(), true).into(),
1162 );
1163 let index_type = DataType::Int64;
1164
1165 let schema = DFSchema::from_unqualified_fields(
1166 vec![
1167 Field::new("my_array", array_type.clone(), false),
1168 Field::new("my_index", index_type.clone(), false),
1169 ]
1170 .into(),
1171 HashMap::default(),
1172 )
1173 .unwrap();
1174
1175 let udf = array_element_udf();
1176
1177 assert_eq!(
1179 udf.return_type(&[array_type.clone(), index_type.clone()])
1180 .unwrap(),
1181 fixed_size_list_type
1182 );
1183
1184 let udf_expr = Expr::ScalarFunction(ScalarFunction {
1186 func: array_element_udf(),
1187 args: vec![
1188 Expr::Column(Column::new_unqualified("my_array")),
1189 Expr::Column(Column::new_unqualified("my_index")),
1190 ],
1191 });
1192 assert_eq!(
1193 ExprSchemable::get_type(&udf_expr, &schema).unwrap(),
1194 fixed_size_list_type
1195 );
1196 }
1197
1198 #[test]
1199 fn test_array_slice_list_view_basic() -> Result<()> {
1200 let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
1201 let offsets = ScalarBuffer::from(vec![0, 3]);
1202 let sizes = ScalarBuffer::from(vec![3, 2]);
1203 let field = Arc::new(Field::new("item", DataType::Int32, true));
1204 let array = ListViewArray::new(field, offsets, sizes, values, None);
1205
1206 let from = Int64Array::from(vec![2, 1]);
1207 let to = Int64Array::from(vec![3, 2]);
1208
1209 let result = general_list_view_array_slice::<i32>(
1210 &array,
1211 &from,
1212 &to,
1213 None::<&Int64Array>,
1214 )?;
1215 let result = result.as_ref().as_list_view::<i32>();
1216
1217 assert_eq!(list_view_values(result), vec![vec![2, 3], vec![4, 5]]);
1218 Ok(())
1219 }
1220
1221 #[test]
1222 fn test_array_slice_list_view_non_monotonic_offsets() -> Result<()> {
1223 let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
1225 let offsets = ScalarBuffer::from(vec![3, 0]);
1226 let sizes = ScalarBuffer::from(vec![2, 3]);
1227 let field = Arc::new(Field::new("item", DataType::Int32, true));
1228 let array = ListViewArray::new(field, offsets, sizes, values, None);
1229
1230 let from = Int64Array::from(vec![1, 1]);
1231 let to = Int64Array::from(vec![2, 2]);
1232
1233 let result = general_list_view_array_slice::<i32>(
1234 &array,
1235 &from,
1236 &to,
1237 None::<&Int64Array>,
1238 )?;
1239 let result = result.as_ref().as_list_view::<i32>();
1240
1241 assert_eq!(list_view_values(result), vec![vec![4, 5], vec![1, 2]]);
1242 Ok(())
1243 }
1244
1245 #[test]
1246 fn test_array_slice_list_view_negative_stride() -> Result<()> {
1247 let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
1248 let offsets = ScalarBuffer::from(vec![0, 3]);
1249 let sizes = ScalarBuffer::from(vec![3, 2]);
1250 let field = Arc::new(Field::new("item", DataType::Int32, true));
1251 let array = ListViewArray::new(field, offsets, sizes, values, None);
1252
1253 let from = Int64Array::from(vec![3, 2]);
1254 let to = Int64Array::from(vec![1, 1]);
1255 let stride = Int64Array::from(vec![-1, -1]);
1256
1257 let result =
1258 general_list_view_array_slice::<i32>(&array, &from, &to, Some(&stride))?;
1259 let result = result.as_ref().as_list_view::<i32>();
1260
1261 assert_eq!(list_view_values(result), vec![vec![3, 2, 1], vec![5, 4]]);
1262 Ok(())
1263 }
1264
1265 #[test]
1266 fn test_array_slice_list_view_out_of_order() -> Result<()> {
1267 let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
1268 let offsets = ScalarBuffer::from(vec![3, 1, 0]);
1269 let sizes = ScalarBuffer::from(vec![2, 2, 1]);
1270 let field = Arc::new(Field::new("item", DataType::Int32, true));
1271 let array = ListViewArray::new(field, offsets, sizes, values, None);
1272 assert_eq!(
1273 list_view_values(&array),
1274 vec![vec![4, 5], vec![2, 3], vec![1]]
1275 );
1276
1277 let from = Int64Array::from(vec![2, 2, 2]);
1278 let to = Int64Array::from(vec![1, 1, 1]);
1279 let stride = Int64Array::from(vec![-1, -1, -1]);
1280
1281 let result =
1282 general_list_view_array_slice::<i32>(&array, &from, &to, Some(&stride))?;
1283 let result = result.as_ref().as_list_view::<i32>();
1284
1285 assert_eq!(
1286 list_view_values(result),
1287 vec![vec![5, 4], vec![3, 2], vec![]]
1288 );
1289 Ok(())
1290 }
1291
1292 #[test]
1293 fn test_array_slice_list_view_with_nulls() -> Result<()> {
1294 let values: ArrayRef = Arc::new(Int32Array::from(vec![
1295 Some(1),
1296 None,
1297 Some(3),
1298 Some(4),
1299 Some(5),
1300 ]));
1301 let offsets = ScalarBuffer::from(vec![0, 2, 5]);
1302 let sizes = ScalarBuffer::from(vec![2, 3, 0]);
1303 let field = Arc::new(Field::new("item", DataType::Int32, true));
1304 let array = ListViewArray::new(field, offsets, sizes, values, None);
1305
1306 let from = Int64Array::from(vec![1, 1, 1]);
1307 let to = Int64Array::from(vec![2, 2, 1]);
1308
1309 let result = general_list_view_array_slice::<i32>(&array, &from, &to, None)?;
1310 let result = result.as_ref().as_list_view::<i32>();
1311
1312 let actual: Vec<Vec<Option<i32>>> = (0..result.len())
1313 .map(|i| {
1314 result
1315 .value(i)
1316 .as_any()
1317 .downcast_ref::<Int32Array>()
1318 .unwrap()
1319 .iter()
1320 .collect()
1321 })
1322 .collect();
1323
1324 assert_eq!(
1325 actual,
1326 vec![vec![Some(1), None], vec![Some(3), Some(4)], Vec::new(),]
1327 );
1328
1329 let stride_with_null = Int64Array::from(vec![Some(1), None, Some(1)]);
1331 let result = general_list_view_array_slice::<i32>(
1332 &array,
1333 &from,
1334 &to,
1335 Some(&stride_with_null),
1336 )?;
1337 let result = result.as_ref().as_list_view::<i32>();
1338
1339 assert!(!result.is_null(0)); assert!(result.is_null(1)); assert!(!result.is_null(2)); let first_row: Vec<Option<i32>> = result
1347 .value(0)
1348 .as_any()
1349 .downcast_ref::<Int32Array>()
1350 .unwrap()
1351 .iter()
1352 .collect();
1353 assert_eq!(first_row, vec![Some(1), None]);
1354
1355 Ok(())
1356 }
1357}