1use arrow::array::{
21 Array, ArrayRef, Capacities, GenericListArray, GenericListViewArray, Int64Array,
22 MutableArrayData, NullArray, OffsetSizeTrait,
23};
24use arrow::buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
25use arrow::datatypes::DataType;
26use arrow::datatypes::{
27 DataType::{FixedSizeList, LargeList, LargeListView, List, ListView, Null},
28 Field,
29};
30use datafusion_common::cast::as_large_list_array;
31use datafusion_common::cast::as_list_array;
32use datafusion_common::cast::{
33 as_int64_array, as_large_list_view_array, as_list_view_array,
34};
35use datafusion_common::internal_err;
36use datafusion_common::utils::ListCoercion;
37use datafusion_common::{
38 Result, exec_datafusion_err, exec_err, internal_datafusion_err, plan_err,
39 utils::take_function_args,
40};
41use datafusion_expr::{
42 ArrayFunctionArgument, ArrayFunctionSignature, Expr, ScalarFunctionArgs,
43 TypeSignature,
44};
45use datafusion_expr::{
46 ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
47};
48use datafusion_macros::user_doc;
49use std::sync::Arc;
50
51use crate::utils::make_scalar_function;
52
53make_udf_expr_and_func!(
55 ArrayElement,
56 array_element,
57 array element,
58 "extracts the element with the index n from the array.",
59 array_element_udf
60);
61
62create_func!(ArraySlice, array_slice_udf);
63
64make_udf_expr_and_func!(
65 ArrayPopFront,
66 array_pop_front,
67 array,
68 "returns the array without the first element.",
69 array_pop_front_udf
70);
71
72make_udf_expr_and_func!(
73 ArrayPopBack,
74 array_pop_back,
75 array,
76 "returns the array without the last element.",
77 array_pop_back_udf
78);
79
80make_udf_expr_and_func!(
81 ArrayAnyValue,
82 array_any_value,
83 array,
84 "returns the first non-null element in the array.",
85 array_any_value_udf
86);
87
88#[user_doc(
89 doc_section(label = "Array Functions"),
90 description = "Extracts the element with the index n from the array.",
91 syntax_example = "array_element(array, index)",
92 sql_example = r#"```sql
93> select array_element([1, 2, 3, 4], 3);
94+-----------------------------------------+
95| array_element(List([1,2,3,4]),Int64(3)) |
96+-----------------------------------------+
97| 3 |
98+-----------------------------------------+
99```"#,
100 argument(
101 name = "array",
102 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
103 ),
104 argument(
105 name = "index",
106 description = "Index to extract the element from the array."
107 )
108)]
109#[derive(Debug, PartialEq, Eq, Hash)]
110pub struct ArrayElement {
111 signature: Signature,
112 aliases: Vec<String>,
113}
114
115impl Default for ArrayElement {
116 fn default() -> Self {
117 Self::new()
118 }
119}
120
121impl ArrayElement {
122 pub fn new() -> Self {
123 Self {
124 signature: Signature::array_and_index(Volatility::Immutable),
125 aliases: vec![
126 String::from("array_extract"),
127 String::from("list_element"),
128 String::from("list_extract"),
129 ],
130 }
131 }
132}
133
134impl ScalarUDFImpl for ArrayElement {
135 fn name(&self) -> &str {
136 "array_element"
137 }
138
139 fn display_name(&self, args: &[Expr]) -> Result<String> {
140 let args_name = args.iter().map(ToString::to_string).collect::<Vec<_>>();
141 if args_name.len() != 2 {
142 return exec_err!("expect 2 args, got {}", args_name.len());
143 }
144
145 Ok(format!("{}[{}]", args_name[0], args_name[1]))
146 }
147
148 fn schema_name(&self, args: &[Expr]) -> Result<String> {
149 let args_name = args
150 .iter()
151 .map(|e| e.schema_name().to_string())
152 .collect::<Vec<_>>();
153 if args_name.len() != 2 {
154 return exec_err!("expect 2 args, got {}", args_name.len());
155 }
156
157 Ok(format!("{}[{}]", args_name[0], args_name[1]))
158 }
159
160 fn signature(&self) -> &Signature {
161 &self.signature
162 }
163
164 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
165 match &arg_types[0] {
166 Null => Ok(Null),
167 List(field) | LargeList(field) => Ok(field.data_type().clone()),
168 arg_type => plan_err!("{} does not support type {arg_type}", self.name()),
169 }
170 }
171
172 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
173 make_scalar_function(array_element_inner)(&args.args)
174 }
175
176 fn aliases(&self) -> &[String] {
177 &self.aliases
178 }
179
180 fn documentation(&self) -> Option<&Documentation> {
181 self.doc()
182 }
183}
184
185fn array_element_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
193 let [array, indexes] = take_function_args("array_element", args)?;
194
195 match &array.data_type() {
196 Null => Ok(Arc::new(NullArray::new(array.len()))),
197 List(_) => {
198 let array = as_list_array(&array)?;
199 let indexes = as_int64_array(&indexes)?;
200 general_array_element::<i32>(array, indexes)
201 }
202 LargeList(_) => {
203 let array = as_large_list_array(&array)?;
204 let indexes = as_int64_array(&indexes)?;
205 general_array_element::<i64>(array, indexes)
206 }
207 arg_type => {
208 exec_err!("array_element does not support type {arg_type}")
209 }
210 }
211}
212
213fn general_array_element<O: OffsetSizeTrait>(
214 array: &GenericListArray<O>,
215 indexes: &Int64Array,
216) -> Result<ArrayRef>
217where
218 i64: TryInto<O>,
219{
220 let values = array.values();
221 if values.data_type().is_null() {
222 return Ok(Arc::new(NullArray::new(array.len())));
223 }
224
225 let original_data = values.to_data();
226 let capacity = Capacities::Array(original_data.len());
227
228 let mut mutable =
230 MutableArrayData::with_capacities(vec![&original_data], true, capacity);
231
232 fn adjusted_array_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
233 where
234 i64: TryInto<O>,
235 {
236 let index: O = index.try_into().map_err(|_| {
237 exec_datafusion_err!("array_element got invalid index: {index}")
238 })?;
239 let adjusted_zero_index = if index < O::usize_as(0) {
241 index + len
242 } else {
243 index - O::usize_as(1)
244 };
245
246 if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
247 Ok(Some(adjusted_zero_index))
248 } else {
249 Ok(None)
251 }
252 }
253
254 for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
255 let start = offset_window[0];
256 let end = offset_window[1];
257 let len = end - start;
258
259 if array.is_null(row_index) {
261 mutable.extend_nulls(1);
262 continue;
263 }
264
265 let index = adjusted_array_index::<O>(indexes.value(row_index), len)?;
266
267 if let Some(index) = index {
268 let start = start.as_usize() + index.as_usize();
269 mutable.extend(0, start, start + 1_usize);
270 } else {
271 mutable.extend_nulls(1);
273 }
274 }
275
276 let data = mutable.freeze();
277 Ok(arrow::array::make_array(data))
278}
279
280#[doc = "returns a slice of the array."]
281pub fn array_slice(array: Expr, begin: Expr, end: Expr, stride: Option<Expr>) -> Expr {
282 let args = match stride {
283 Some(stride) => vec![array, begin, end, stride],
284 None => vec![array, begin, end],
285 };
286 array_slice_udf().call(args)
287}
288
289#[user_doc(
290 doc_section(label = "Array Functions"),
291 description = "Returns a slice of the array based on 1-indexed start and end positions.",
292 syntax_example = "array_slice(array, begin, end)",
293 sql_example = r#"```sql
294> select array_slice([1, 2, 3, 4, 5, 6, 7, 8], 3, 6);
295+--------------------------------------------------------+
296| array_slice(List([1,2,3,4,5,6,7,8]),Int64(3),Int64(6)) |
297+--------------------------------------------------------+
298| [3, 4, 5, 6] |
299+--------------------------------------------------------+
300```"#,
301 argument(
302 name = "array",
303 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
304 ),
305 argument(
306 name = "begin",
307 description = "Index of the first element. If negative, it counts backward from the end of the array."
308 ),
309 argument(
310 name = "end",
311 description = "Index of the last element. If negative, it counts backward from the end of the array."
312 ),
313 argument(
314 name = "stride",
315 description = "Stride of the array slice. The default is 1."
316 )
317)]
318#[derive(Debug, PartialEq, Eq, Hash)]
319pub(super) struct ArraySlice {
320 signature: Signature,
321 aliases: Vec<String>,
322}
323
324impl ArraySlice {
325 pub fn new() -> Self {
326 Self {
327 signature: Signature::one_of(
328 vec![
329 TypeSignature::ArraySignature(ArrayFunctionSignature::Array {
330 arguments: vec![
331 ArrayFunctionArgument::Array,
332 ArrayFunctionArgument::Index,
333 ArrayFunctionArgument::Index,
334 ],
335 array_coercion: Some(ListCoercion::FixedSizedListToList),
336 }),
337 TypeSignature::ArraySignature(ArrayFunctionSignature::Array {
338 arguments: vec![
339 ArrayFunctionArgument::Array,
340 ArrayFunctionArgument::Index,
341 ArrayFunctionArgument::Index,
342 ArrayFunctionArgument::Index,
343 ],
344 array_coercion: Some(ListCoercion::FixedSizedListToList),
345 }),
346 ],
347 Volatility::Immutable,
348 ),
349 aliases: vec![String::from("list_slice")],
350 }
351 }
352}
353
354impl ScalarUDFImpl for ArraySlice {
355 fn display_name(&self, args: &[Expr]) -> Result<String> {
356 let args_name = args.iter().map(ToString::to_string).collect::<Vec<_>>();
357 if let Some((arr, indexes)) = args_name.split_first() {
358 Ok(format!("{arr}[{}]", indexes.join(":")))
359 } else {
360 exec_err!("no argument")
361 }
362 }
363
364 fn schema_name(&self, args: &[Expr]) -> Result<String> {
365 let args_name = args
366 .iter()
367 .map(|e| e.schema_name().to_string())
368 .collect::<Vec<_>>();
369 if let Some((arr, indexes)) = args_name.split_first() {
370 Ok(format!("{arr}[{}]", indexes.join(":")))
371 } else {
372 exec_err!("no argument")
373 }
374 }
375
376 fn name(&self) -> &str {
377 "array_slice"
378 }
379
380 fn signature(&self) -> &Signature {
381 &self.signature
382 }
383
384 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
385 Ok(arg_types[0].clone())
386 }
387
388 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
389 make_scalar_function(array_slice_inner)(&args.args)
390 }
391
392 fn aliases(&self) -> &[String] {
393 &self.aliases
394 }
395
396 fn documentation(&self) -> Option<&Documentation> {
397 self.doc()
398 }
399}
400
401fn array_slice_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
418 let args_len = args.len();
419 if args_len != 3 && args_len != 4 {
420 return exec_err!("array_slice needs three or four arguments");
421 }
422
423 let stride = if args_len == 4 {
424 Some(as_int64_array(&args[3])?)
425 } else {
426 None
427 };
428
429 let from_array = as_int64_array(&args[1])?;
430 let to_array = as_int64_array(&args[2])?;
431
432 let array_data_type = args[0].data_type();
433 match array_data_type {
434 List(_) => {
435 let array = as_list_array(&args[0])?;
436 general_array_slice::<i32>(array, from_array, to_array, stride)
437 }
438 LargeList(_) => {
439 let array = as_large_list_array(&args[0])?;
440 general_array_slice::<i64>(array, from_array, to_array, stride)
441 }
442 ListView(_) => {
443 let array = as_list_view_array(&args[0])?;
444 general_list_view_array_slice::<i32>(array, from_array, to_array, stride)
445 }
446 LargeListView(_) => {
447 let array = as_large_list_view_array(&args[0])?;
448 general_list_view_array_slice::<i64>(array, from_array, to_array, stride)
449 }
450 _ => exec_err!("array_slice does not support type: {}", array_data_type),
451 }
452}
453
454fn adjusted_from_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
455where
456 i64: TryInto<O>,
457{
458 let adjusted_zero_index = if index < 0 {
460 if let Ok(index) = index.try_into() {
461 if index < (O::zero() - O::one()) * len {
468 O::zero()
469 } else {
470 index + len
471 }
472 } else {
473 return exec_err!("array_slice got invalid index: {}", index);
474 }
475 } else {
476 if let Ok(index) = index.try_into() {
478 std::cmp::max(index - O::usize_as(1), O::usize_as(0))
479 } else {
480 return exec_err!("array_slice got invalid index: {}", index);
481 }
482 };
483
484 if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
485 Ok(Some(adjusted_zero_index))
486 } else {
487 Ok(None)
489 }
490}
491
492fn adjusted_to_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
493where
494 i64: TryInto<O>,
495{
496 let adjusted_zero_index = if index < 0 {
498 if let Ok(index) = index.try_into() {
500 index + len
501 } else {
502 return exec_err!("array_slice got invalid index: {}", index);
503 }
504 } else {
505 if let Ok(index) = index.try_into() {
507 std::cmp::min(index - O::usize_as(1), len - O::usize_as(1))
508 } else {
509 return exec_err!("array_slice got invalid index: {}", index);
510 }
511 };
512
513 if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
514 Ok(Some(adjusted_zero_index))
515 } else {
516 Ok(None)
518 }
519}
520
521enum SlicePlan<O: OffsetSizeTrait> {
525 Empty,
527 Contiguous { start: O, len: O },
530 Indices(Vec<O>),
533}
534
535fn compute_slice_plan<O: OffsetSizeTrait>(
537 len: O,
538 from_raw: i64,
539 to_raw: i64,
540 stride_raw: Option<i64>,
541) -> Result<SlicePlan<O>>
542where
543 i64: TryInto<O>,
544{
545 if len == O::usize_as(0) {
546 return Ok(SlicePlan::Empty);
547 }
548
549 let from_index = adjusted_from_index::<O>(from_raw, len)?;
550 let to_index = adjusted_to_index::<O>(to_raw, len)?;
551
552 let (Some(from), Some(to)) = (from_index, to_index) else {
553 return Ok(SlicePlan::Empty);
554 };
555
556 let stride_value = stride_raw.unwrap_or(1);
557 if stride_value == 0 {
558 return exec_err!(
559 "array_slice got invalid stride: {:?}, it cannot be 0",
560 stride_value
561 );
562 }
563
564 if (from < to && stride_value.is_negative())
565 || (from > to && stride_value.is_positive())
566 {
567 return Ok(SlicePlan::Empty);
568 }
569
570 let stride: O = stride_value.try_into().map_err(|_| {
571 internal_datafusion_err!("array_slice got invalid stride: {}", stride_value)
572 })?;
573
574 if from <= to && stride_value.is_positive() {
575 if stride_value == 1 {
576 let len = to - from + O::usize_as(1);
577 Ok(SlicePlan::Contiguous { start: from, len })
578 } else {
579 let mut indices = Vec::new();
580 let mut index = from;
581 while index <= to {
582 indices.push(index);
583 index += stride;
584 }
585 Ok(SlicePlan::Indices(indices))
586 }
587 } else {
588 let mut indices = Vec::new();
589 let mut index = from;
590 while index >= to {
591 indices.push(index);
592 index += stride;
593 }
594 Ok(SlicePlan::Indices(indices))
595 }
596}
597
598fn combine_input_nulls(
600 array: &dyn Array,
601 from_array: &Int64Array,
602 to_array: &Int64Array,
603 stride: Option<&Int64Array>,
604) -> Option<NullBuffer> {
605 NullBuffer::union_many([
606 array.nulls(),
607 from_array.nulls(),
608 to_array.nulls(),
609 stride.and_then(|s| s.nulls()),
610 ])
611}
612
613fn general_array_slice<O: OffsetSizeTrait>(
614 array: &GenericListArray<O>,
615 from_array: &Int64Array,
616 to_array: &Int64Array,
617 stride: Option<&Int64Array>,
618) -> Result<ArrayRef>
619where
620 i64: TryInto<O>,
621{
622 let values = array.values();
623 let original_data = values.to_data();
624 let capacity = Capacities::Array(original_data.len());
625
626 let mut mutable =
627 MutableArrayData::with_capacities(vec![&original_data], true, capacity);
628
629 let mut offsets = vec![O::usize_as(0)];
633
634 let nulls = combine_input_nulls(array, from_array, to_array, stride);
635
636 for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
637 let start = offset_window[0];
638 let end = offset_window[1];
639 let len = end - start;
640
641 if nulls.as_ref().is_some_and(|n| n.is_null(row_index)) {
642 mutable.extend_nulls(1);
643 offsets.push(offsets[row_index] + O::usize_as(1));
644 continue;
645 }
646
647 if len == O::usize_as(0) {
649 offsets.push(offsets[row_index]);
650 continue;
651 }
652
653 let slice_plan = compute_slice_plan::<O>(
654 len,
655 from_array.value(row_index),
656 to_array.value(row_index),
657 stride.map(|s| s.value(row_index)),
658 )?;
659
660 match slice_plan {
661 SlicePlan::Empty => offsets.push(offsets[row_index]),
662 SlicePlan::Contiguous {
663 start: rel_start,
664 len: slice_len,
665 } => {
666 let start_index = (start + rel_start).to_usize().unwrap();
667 let end_index = (start + rel_start + slice_len).to_usize().unwrap();
668 mutable.extend(0, start_index, end_index);
669 offsets.push(offsets[row_index] + slice_len);
670 }
671 SlicePlan::Indices(indices) => {
672 let count = indices.len();
673 for rel_index in indices {
674 let absolute_index = (start + rel_index).to_usize().unwrap();
675 mutable.extend(0, absolute_index, absolute_index + 1);
676 }
677 offsets.push(offsets[row_index] + O::usize_as(count));
678 }
679 }
680 }
681
682 let data = mutable.freeze();
683
684 Ok(Arc::new(GenericListArray::<O>::try_new(
685 Arc::new(Field::new_list_field(array.value_type(), true)),
686 OffsetBuffer::<O>::new(offsets.into()),
687 arrow::array::make_array(data),
688 nulls,
689 )?))
690}
691
692fn general_list_view_array_slice<O: OffsetSizeTrait>(
693 array: &GenericListViewArray<O>,
694 from_array: &Int64Array,
695 to_array: &Int64Array,
696 stride: Option<&Int64Array>,
697) -> Result<ArrayRef>
698where
699 i64: TryInto<O>,
700{
701 let values = array.values();
702 let original_data = values.to_data();
703 let capacity = Capacities::Array(original_data.len());
704 let field = match array.data_type() {
705 ListView(field) | LargeListView(field) => Arc::clone(field),
706 other => {
707 return internal_err!("array_slice got unexpected data type: {}", other);
708 }
709 };
710
711 let mut mutable =
712 MutableArrayData::with_capacities(vec![&original_data], true, capacity);
713
714 let mut offsets = Vec::with_capacity(array.len());
717 let mut sizes = Vec::with_capacity(array.len());
718 let mut current_offset = O::usize_as(0);
719
720 let nulls = combine_input_nulls(array, from_array, to_array, stride);
721
722 for row_index in 0..array.len() {
723 if nulls.as_ref().is_some_and(|n| n.is_null(row_index)) {
724 offsets.push(current_offset);
725 sizes.push(O::usize_as(0));
726 continue;
727 }
728
729 let len = array.value_size(row_index);
730
731 if len == O::usize_as(0) {
733 offsets.push(current_offset);
734 sizes.push(O::usize_as(0));
735 continue;
736 }
737
738 let slice_plan = compute_slice_plan::<O>(
739 len,
740 from_array.value(row_index),
741 to_array.value(row_index),
742 stride.map(|s| s.value(row_index)),
743 )?;
744
745 let start = array.value_offset(row_index);
746 match slice_plan {
747 SlicePlan::Empty => {
748 offsets.push(current_offset);
749 sizes.push(O::usize_as(0));
750 }
751 SlicePlan::Contiguous {
752 start: rel_start,
753 len: slice_len,
754 } => {
755 let start_index = (start + rel_start).to_usize().unwrap();
756 let end_index = (start + rel_start + slice_len).to_usize().unwrap();
757 mutable.extend(0, start_index, end_index);
758 offsets.push(current_offset);
759 sizes.push(slice_len);
760 current_offset += slice_len;
761 }
762 SlicePlan::Indices(indices) => {
763 let count = indices.len();
764 for rel_index in indices {
765 let absolute_index = (start + rel_index).to_usize().unwrap();
766 mutable.extend(0, absolute_index, absolute_index + 1);
767 }
768 let length = O::usize_as(count);
769 offsets.push(current_offset);
770 sizes.push(length);
771 current_offset += length;
772 }
773 }
774 }
775
776 let data = mutable.freeze();
777
778 Ok(Arc::new(GenericListViewArray::<O>::try_new(
779 field,
780 ScalarBuffer::from(offsets),
781 ScalarBuffer::from(sizes),
782 arrow::array::make_array(data),
783 nulls,
784 )?))
785}
786
787#[user_doc(
788 doc_section(label = "Array Functions"),
789 description = "Returns the array without the first element.",
790 syntax_example = "array_pop_front(array)",
791 sql_example = r#"```sql
792> select array_pop_front([1, 2, 3]);
793+-------------------------------+
794| array_pop_front(List([1,2,3])) |
795+-------------------------------+
796| [2, 3] |
797+-------------------------------+
798```"#,
799 argument(
800 name = "array",
801 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
802 )
803)]
804#[derive(Debug, PartialEq, Eq, Hash)]
805pub(super) struct ArrayPopFront {
806 signature: Signature,
807 aliases: Vec<String>,
808}
809
810impl ArrayPopFront {
811 pub fn new() -> Self {
812 Self {
813 signature: Signature::array(Volatility::Immutable),
814 aliases: vec![String::from("list_pop_front")],
815 }
816 }
817}
818
819impl ScalarUDFImpl for ArrayPopFront {
820 fn name(&self) -> &str {
821 "array_pop_front"
822 }
823
824 fn signature(&self) -> &Signature {
825 &self.signature
826 }
827
828 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
829 Ok(arg_types[0].clone())
830 }
831
832 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
833 make_scalar_function(array_pop_front_inner)(&args.args)
834 }
835
836 fn aliases(&self) -> &[String] {
837 &self.aliases
838 }
839
840 fn documentation(&self) -> Option<&Documentation> {
841 self.doc()
842 }
843}
844
845fn array_pop_front_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
847 let array_data_type = args[0].data_type();
848 match array_data_type {
849 List(_) => {
850 let array = as_list_array(&args[0])?;
851 general_pop_front_list::<i32>(array)
852 }
853 LargeList(_) => {
854 let array = as_large_list_array(&args[0])?;
855 general_pop_front_list::<i64>(array)
856 }
857 _ => exec_err!("array_pop_front does not support type: {}", array_data_type),
858 }
859}
860
861fn general_pop_front_list<O: OffsetSizeTrait>(
862 array: &GenericListArray<O>,
863) -> Result<ArrayRef>
864where
865 i64: TryInto<O>,
866{
867 let from_array = Int64Array::from(vec![2; array.len()]);
868 let to_array = Int64Array::from(
869 array
870 .iter()
871 .map(|arr| arr.map_or(0, |arr| arr.len() as i64))
872 .collect::<Vec<i64>>(),
873 );
874 general_array_slice::<O>(array, &from_array, &to_array, None)
875}
876
877#[user_doc(
878 doc_section(label = "Array Functions"),
879 description = "Returns the array without the last element.",
880 syntax_example = "array_pop_back(array)",
881 sql_example = r#"```sql
882> select array_pop_back([1, 2, 3]);
883+-------------------------------+
884| array_pop_back(List([1,2,3])) |
885+-------------------------------+
886| [1, 2] |
887+-------------------------------+
888```"#,
889 argument(
890 name = "array",
891 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
892 )
893)]
894#[derive(Debug, PartialEq, Eq, Hash)]
895pub(super) struct ArrayPopBack {
896 signature: Signature,
897 aliases: Vec<String>,
898}
899
900impl ArrayPopBack {
901 pub fn new() -> Self {
902 Self {
903 signature: Signature::array(Volatility::Immutable),
904 aliases: vec![String::from("list_pop_back")],
905 }
906 }
907}
908
909impl ScalarUDFImpl for ArrayPopBack {
910 fn name(&self) -> &str {
911 "array_pop_back"
912 }
913
914 fn signature(&self) -> &Signature {
915 &self.signature
916 }
917
918 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
919 Ok(arg_types[0].clone())
920 }
921
922 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
923 make_scalar_function(array_pop_back_inner)(&args.args)
924 }
925
926 fn aliases(&self) -> &[String] {
927 &self.aliases
928 }
929
930 fn documentation(&self) -> Option<&Documentation> {
931 self.doc()
932 }
933}
934
935fn array_pop_back_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
937 let [array] = take_function_args("array_pop_back", args)?;
938
939 match array.data_type() {
940 List(_) => {
941 let array = as_list_array(&array)?;
942 general_pop_back_list::<i32>(array)
943 }
944 LargeList(_) => {
945 let array = as_large_list_array(&array)?;
946 general_pop_back_list::<i64>(array)
947 }
948 _ => exec_err!(
949 "array_pop_back does not support type: {}",
950 array.data_type()
951 ),
952 }
953}
954
955fn general_pop_back_list<O: OffsetSizeTrait>(
956 array: &GenericListArray<O>,
957) -> Result<ArrayRef>
958where
959 i64: TryInto<O>,
960{
961 let from_array = Int64Array::from(vec![1; array.len()]);
962 let to_array = Int64Array::from(
963 array
964 .iter()
965 .map(|arr| arr.map_or(0, |arr| arr.len() as i64 - 1))
966 .collect::<Vec<i64>>(),
967 );
968 general_array_slice::<O>(array, &from_array, &to_array, None)
969}
970
971#[user_doc(
972 doc_section(label = "Array Functions"),
973 description = "Returns the first non-null element in the array.",
974 syntax_example = "array_any_value(array)",
975 sql_example = r#"```sql
976> select array_any_value([NULL, 1, 2, 3]);
977+-------------------------------+
978| array_any_value(List([NULL,1,2,3])) |
979+-------------------------------------+
980| 1 |
981+-------------------------------------+
982```"#,
983 argument(
984 name = "array",
985 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
986 )
987)]
988#[derive(Debug, PartialEq, Eq, Hash)]
989pub(super) struct ArrayAnyValue {
990 signature: Signature,
991 aliases: Vec<String>,
992}
993
994impl ArrayAnyValue {
995 pub fn new() -> Self {
996 Self {
997 signature: Signature::array(Volatility::Immutable),
998 aliases: vec![String::from("list_any_value")],
999 }
1000 }
1001}
1002
1003impl ScalarUDFImpl for ArrayAnyValue {
1004 fn name(&self) -> &str {
1005 "array_any_value"
1006 }
1007 fn signature(&self) -> &Signature {
1008 &self.signature
1009 }
1010 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
1011 match &arg_types[0] {
1012 List(field) | LargeList(field) | FixedSizeList(field, _) => {
1013 Ok(field.data_type().clone())
1014 }
1015 _ => plan_err!(
1016 "array_any_value can only accept List, LargeList or FixedSizeList as the argument"
1017 ),
1018 }
1019 }
1020
1021 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
1022 make_scalar_function(array_any_value_inner)(&args.args)
1023 }
1024
1025 fn aliases(&self) -> &[String] {
1026 &self.aliases
1027 }
1028
1029 fn documentation(&self) -> Option<&Documentation> {
1030 self.doc()
1031 }
1032}
1033
1034fn array_any_value_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
1035 let [array] = take_function_args("array_any_value", args)?;
1036
1037 match &array.data_type() {
1038 List(_) => {
1039 let array = as_list_array(&array)?;
1040 general_array_any_value::<i32>(array)
1041 }
1042 LargeList(_) => {
1043 let array = as_large_list_array(&array)?;
1044 general_array_any_value::<i64>(array)
1045 }
1046 data_type => exec_err!("array_any_value does not support type: {data_type}"),
1047 }
1048}
1049
1050fn general_array_any_value<O: OffsetSizeTrait>(
1051 array: &GenericListArray<O>,
1052) -> Result<ArrayRef>
1053where
1054 i64: TryInto<O>,
1055{
1056 let values = array.values();
1057 let original_data = values.to_data();
1058 let capacity = Capacities::Array(array.len());
1059
1060 let mut mutable =
1061 MutableArrayData::with_capacities(vec![&original_data], true, capacity);
1062
1063 for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
1064 let start = offset_window[0];
1065
1066 if array.is_null(row_index) {
1068 mutable.extend_nulls(1);
1069 continue;
1070 }
1071
1072 let row_value = array.value(row_index);
1073 match row_value.nulls() {
1074 Some(row_nulls_buffer) => {
1075 if let Some(first_non_null_index) =
1077 row_nulls_buffer.valid_indices().next()
1078 {
1079 let index = start.as_usize() + first_non_null_index;
1080 mutable.extend(0, index, index + 1)
1081 } else {
1082 mutable.extend_nulls(1);
1084 }
1085 }
1086 None => {
1087 let index = start.as_usize();
1089 mutable.extend(0, index, index + 1);
1090 }
1091 }
1092 }
1093
1094 let data = mutable.freeze();
1095 Ok(arrow::array::make_array(data))
1096}
1097
1098#[cfg(test)]
1099mod tests {
1100 use super::{
1101 array_element_udf, general_array_any_value, general_array_element,
1102 general_list_view_array_slice,
1103 };
1104 use arrow::array::{
1105 Array, ArrayRef, GenericListViewArray, Int32Array, Int64Array, ListViewArray,
1106 cast::AsArray,
1107 };
1108 use arrow::array::{ListArray, RecordBatch};
1109 use arrow::buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
1110 use arrow::datatypes::{DataType, Field};
1111 use datafusion_common::{Column, DFSchema, Result, assert_batches_eq};
1112 use datafusion_expr::expr::ScalarFunction;
1113 use datafusion_expr::{Expr, ExprSchemable};
1114 use std::collections::HashMap;
1115 use std::sync::Arc;
1116
1117 fn list_view_values(array: &GenericListViewArray<i32>) -> Vec<Vec<i32>> {
1118 (0..array.len())
1119 .map(|i| {
1120 let child = array.value(i);
1121 let values = child.as_any().downcast_ref::<Int32Array>().unwrap();
1122 values.iter().map(|v| v.unwrap()).collect()
1123 })
1124 .collect()
1125 }
1126
1127 #[test]
1129 fn test_array_element_return_type_fixed_size_list() {
1130 let fixed_size_list_type = DataType::FixedSizeList(
1131 Field::new("some_arbitrary_test_field", DataType::Int32, false).into(),
1132 13,
1133 );
1134 let array_type = DataType::List(
1135 Field::new_list_field(fixed_size_list_type.clone(), true).into(),
1136 );
1137 let index_type = DataType::Int64;
1138
1139 let schema = DFSchema::from_unqualified_fields(
1140 vec![
1141 Field::new("my_array", array_type.clone(), false),
1142 Field::new("my_index", index_type.clone(), false),
1143 ]
1144 .into(),
1145 HashMap::default(),
1146 )
1147 .unwrap();
1148
1149 let udf = array_element_udf();
1150
1151 assert_eq!(
1153 udf.return_type(&[array_type.clone(), index_type.clone()])
1154 .unwrap(),
1155 fixed_size_list_type
1156 );
1157
1158 let udf_expr = Expr::ScalarFunction(ScalarFunction {
1160 func: array_element_udf(),
1161 args: vec![
1162 Expr::Column(Column::new_unqualified("my_array")),
1163 Expr::Column(Column::new_unqualified("my_index")),
1164 ],
1165 });
1166 assert_eq!(
1167 ExprSchemable::get_type(&udf_expr, &schema).unwrap(),
1168 fixed_size_list_type
1169 );
1170 }
1171
1172 #[test]
1173 fn test_array_element_null_handling() -> Result<()> {
1174 let values = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
1175 let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 3, 4, 5]));
1176 let nulls = NullBuffer::from(vec![true, false, true]);
1177 let field = Arc::new(Field::new("item", DataType::Int32, true));
1178
1179 let list_array = ListArray::new(field, offsets, values, Some(nulls));
1180 let indexes = Int64Array::from(vec![1, 1, 1]);
1181
1182 let result = general_array_element(&list_array, &indexes)?;
1183
1184 let expected = [
1185 "+--------+",
1186 "| result |",
1187 "+--------+",
1188 "| 1 |",
1189 "| |",
1190 "| 5 |",
1191 "+--------+",
1192 ];
1193
1194 let batch = RecordBatch::try_from_iter([("result", result)])?;
1195
1196 assert_batches_eq!(expected, &[batch]);
1197
1198 Ok(())
1199 }
1200
1201 #[test]
1202 fn test_array_any_null_handling() -> Result<()> {
1203 let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
1204 let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 3, 4, 5]));
1205 let nulls = NullBuffer::from(vec![true, false, true]);
1206 let field = Arc::new(Field::new("item", DataType::Int32, true));
1207
1208 let list_array = ListArray::new(field, offsets, values, Some(nulls));
1209
1210 let result = general_array_any_value(&list_array)?;
1211
1212 assert!(!result.is_null(0));
1213 assert!(result.is_null(1));
1214 assert!(!result.is_null(2));
1215
1216 Ok(())
1217 }
1218
1219 #[test]
1220 fn test_array_slice_list_view_basic() -> Result<()> {
1221 let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
1222 let offsets = ScalarBuffer::from(vec![0, 3]);
1223 let sizes = ScalarBuffer::from(vec![3, 2]);
1224 let field = Arc::new(Field::new("item", DataType::Int32, true));
1225 let array = ListViewArray::new(field, offsets, sizes, values, None);
1226
1227 let from = Int64Array::from(vec![2, 1]);
1228 let to = Int64Array::from(vec![3, 2]);
1229
1230 let result = general_list_view_array_slice::<i32>(
1231 &array,
1232 &from,
1233 &to,
1234 None::<&Int64Array>,
1235 )?;
1236 let result = result.as_ref().as_list_view::<i32>();
1237
1238 assert_eq!(list_view_values(result), vec![vec![2, 3], vec![4, 5]]);
1239 Ok(())
1240 }
1241
1242 #[test]
1243 fn test_array_slice_list_view_non_monotonic_offsets() -> Result<()> {
1244 let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
1246 let offsets = ScalarBuffer::from(vec![3, 0]);
1247 let sizes = ScalarBuffer::from(vec![2, 3]);
1248 let field = Arc::new(Field::new("item", DataType::Int32, true));
1249 let array = ListViewArray::new(field, offsets, sizes, values, None);
1250
1251 let from = Int64Array::from(vec![1, 1]);
1252 let to = Int64Array::from(vec![2, 2]);
1253
1254 let result = general_list_view_array_slice::<i32>(
1255 &array,
1256 &from,
1257 &to,
1258 None::<&Int64Array>,
1259 )?;
1260 let result = result.as_ref().as_list_view::<i32>();
1261
1262 assert_eq!(list_view_values(result), vec![vec![4, 5], vec![1, 2]]);
1263 Ok(())
1264 }
1265
1266 #[test]
1267 fn test_array_slice_list_view_negative_stride() -> Result<()> {
1268 let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
1269 let offsets = ScalarBuffer::from(vec![0, 3]);
1270 let sizes = ScalarBuffer::from(vec![3, 2]);
1271 let field = Arc::new(Field::new("item", DataType::Int32, true));
1272 let array = ListViewArray::new(field, offsets, sizes, values, None);
1273
1274 let from = Int64Array::from(vec![3, 2]);
1275 let to = Int64Array::from(vec![1, 1]);
1276 let stride = Int64Array::from(vec![-1, -1]);
1277
1278 let result =
1279 general_list_view_array_slice::<i32>(&array, &from, &to, Some(&stride))?;
1280 let result = result.as_ref().as_list_view::<i32>();
1281
1282 assert_eq!(list_view_values(result), vec![vec![3, 2, 1], vec![5, 4]]);
1283 Ok(())
1284 }
1285
1286 #[test]
1287 fn test_array_slice_list_view_out_of_order() -> Result<()> {
1288 let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
1289 let offsets = ScalarBuffer::from(vec![3, 1, 0]);
1290 let sizes = ScalarBuffer::from(vec![2, 2, 1]);
1291 let field = Arc::new(Field::new("item", DataType::Int32, true));
1292 let array = ListViewArray::new(field, offsets, sizes, values, None);
1293 assert_eq!(
1294 list_view_values(&array),
1295 vec![vec![4, 5], vec![2, 3], vec![1]]
1296 );
1297
1298 let from = Int64Array::from(vec![2, 2, 2]);
1299 let to = Int64Array::from(vec![1, 1, 1]);
1300 let stride = Int64Array::from(vec![-1, -1, -1]);
1301
1302 let result =
1303 general_list_view_array_slice::<i32>(&array, &from, &to, Some(&stride))?;
1304 let result = result.as_ref().as_list_view::<i32>();
1305
1306 assert_eq!(
1307 list_view_values(result),
1308 vec![vec![5, 4], vec![3, 2], vec![]]
1309 );
1310 Ok(())
1311 }
1312
1313 #[test]
1314 fn test_array_slice_list_view_with_nulls() -> Result<()> {
1315 let values: ArrayRef = Arc::new(Int32Array::from(vec![
1316 Some(1),
1317 None,
1318 Some(3),
1319 Some(4),
1320 Some(5),
1321 ]));
1322 let offsets = ScalarBuffer::from(vec![0, 2, 5]);
1323 let sizes = ScalarBuffer::from(vec![2, 3, 0]);
1324 let field = Arc::new(Field::new("item", DataType::Int32, true));
1325 let array = ListViewArray::new(field, offsets, sizes, values, None);
1326
1327 let from = Int64Array::from(vec![1, 1, 1]);
1328 let to = Int64Array::from(vec![2, 2, 1]);
1329
1330 let result = general_list_view_array_slice::<i32>(&array, &from, &to, None)?;
1331 let result = result.as_ref().as_list_view::<i32>();
1332
1333 let actual: Vec<Vec<Option<i32>>> = (0..result.len())
1334 .map(|i| {
1335 result
1336 .value(i)
1337 .as_any()
1338 .downcast_ref::<Int32Array>()
1339 .unwrap()
1340 .iter()
1341 .collect()
1342 })
1343 .collect();
1344
1345 assert_eq!(
1346 actual,
1347 vec![vec![Some(1), None], vec![Some(3), Some(4)], Vec::new(),]
1348 );
1349
1350 let stride_with_null = Int64Array::from(vec![Some(1), None, Some(1)]);
1352 let result = general_list_view_array_slice::<i32>(
1353 &array,
1354 &from,
1355 &to,
1356 Some(&stride_with_null),
1357 )?;
1358 let result = result.as_ref().as_list_view::<i32>();
1359
1360 assert!(!result.is_null(0)); assert!(result.is_null(1)); assert!(!result.is_null(2)); let first_row: Vec<Option<i32>> = result
1368 .value(0)
1369 .as_any()
1370 .downcast_ref::<Int32Array>()
1371 .unwrap()
1372 .iter()
1373 .collect();
1374 assert_eq!(first_row, vec![Some(1), None]);
1375
1376 Ok(())
1377 }
1378}