1use arrow::array::{
21 Array, ArrayRef, ArrowNativeTypeOp, Capacities, GenericListArray, Int64Array,
22 MutableArrayData, NullArray, NullBufferBuilder, OffsetSizeTrait,
23};
24use arrow::buffer::OffsetBuffer;
25use arrow::datatypes::DataType;
26use arrow::datatypes::{
27 DataType::{FixedSizeList, LargeList, List, Null},
28 Field,
29};
30use datafusion_common::cast::as_int64_array;
31use datafusion_common::cast::as_large_list_array;
32use datafusion_common::cast::as_list_array;
33use datafusion_common::utils::ListCoercion;
34use datafusion_common::{
35 exec_datafusion_err, exec_err, internal_datafusion_err, plan_err,
36 utils::take_function_args, Result,
37};
38use datafusion_expr::{
39 ArrayFunctionArgument, ArrayFunctionSignature, Expr, TypeSignature,
40};
41use datafusion_expr::{
42 ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
43};
44use datafusion_macros::user_doc;
45use std::any::Any;
46use std::sync::Arc;
47
48use crate::utils::make_scalar_function;
49
50make_udf_expr_and_func!(
52 ArrayElement,
53 array_element,
54 array element,
55 "extracts the element with the index n from the array.",
56 array_element_udf
57);
58
59create_func!(ArraySlice, array_slice_udf);
60
61make_udf_expr_and_func!(
62 ArrayPopFront,
63 array_pop_front,
64 array,
65 "returns the array without the first element.",
66 array_pop_front_udf
67);
68
69make_udf_expr_and_func!(
70 ArrayPopBack,
71 array_pop_back,
72 array,
73 "returns the array without the last element.",
74 array_pop_back_udf
75);
76
77make_udf_expr_and_func!(
78 ArrayAnyValue,
79 array_any_value,
80 array,
81 "returns the first non-null element in the array.",
82 array_any_value_udf
83);
84
85#[user_doc(
86 doc_section(label = "Array Functions"),
87 description = "Extracts the element with the index n from the array.",
88 syntax_example = "array_element(array, index)",
89 sql_example = r#"```sql
90> select array_element([1, 2, 3, 4], 3);
91+-----------------------------------------+
92| array_element(List([1,2,3,4]),Int64(3)) |
93+-----------------------------------------+
94| 3 |
95+-----------------------------------------+
96```"#,
97 argument(
98 name = "array",
99 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
100 ),
101 argument(
102 name = "index",
103 description = "Index to extract the element from the array."
104 )
105)]
106#[derive(Debug, PartialEq, Eq, Hash)]
107pub struct ArrayElement {
108 signature: Signature,
109 aliases: Vec<String>,
110}
111
112impl Default for ArrayElement {
113 fn default() -> Self {
114 Self::new()
115 }
116}
117
118impl ArrayElement {
119 pub fn new() -> Self {
120 Self {
121 signature: Signature::array_and_index(Volatility::Immutable),
122 aliases: vec![
123 String::from("array_extract"),
124 String::from("list_element"),
125 String::from("list_extract"),
126 ],
127 }
128 }
129}
130
131impl ScalarUDFImpl for ArrayElement {
132 fn as_any(&self) -> &dyn Any {
133 self
134 }
135 fn name(&self) -> &str {
136 "array_element"
137 }
138
139 fn display_name(&self, args: &[Expr]) -> Result<String> {
140 let args_name = args.iter().map(ToString::to_string).collect::<Vec<_>>();
141 if args_name.len() != 2 {
142 return exec_err!("expect 2 args, got {}", args_name.len());
143 }
144
145 Ok(format!("{}[{}]", args_name[0], args_name[1]))
146 }
147
148 fn schema_name(&self, args: &[Expr]) -> Result<String> {
149 let args_name = args
150 .iter()
151 .map(|e| e.schema_name().to_string())
152 .collect::<Vec<_>>();
153 if args_name.len() != 2 {
154 return exec_err!("expect 2 args, got {}", args_name.len());
155 }
156
157 Ok(format!("{}[{}]", args_name[0], args_name[1]))
158 }
159
160 fn signature(&self) -> &Signature {
161 &self.signature
162 }
163
164 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
165 match &arg_types[0] {
166 Null => Ok(Null),
167 List(field) | LargeList(field) => Ok(field.data_type().clone()),
168 arg_type => plan_err!("{} does not support type {arg_type}", self.name()),
169 }
170 }
171
172 fn invoke_with_args(
173 &self,
174 args: datafusion_expr::ScalarFunctionArgs,
175 ) -> Result<ColumnarValue> {
176 make_scalar_function(array_element_inner)(&args.args)
177 }
178
179 fn aliases(&self) -> &[String] {
180 &self.aliases
181 }
182
183 fn documentation(&self) -> Option<&Documentation> {
184 self.doc()
185 }
186}
187
188fn array_element_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
196 let [array, indexes] = take_function_args("array_element", args)?;
197
198 match &array.data_type() {
199 Null => Ok(Arc::new(NullArray::new(array.len()))),
200 List(_) => {
201 let array = as_list_array(&array)?;
202 let indexes = as_int64_array(&indexes)?;
203 general_array_element::<i32>(array, indexes)
204 }
205 LargeList(_) => {
206 let array = as_large_list_array(&array)?;
207 let indexes = as_int64_array(&indexes)?;
208 general_array_element::<i64>(array, indexes)
209 }
210 arg_type => {
211 exec_err!("array_element does not support type {arg_type}")
212 }
213 }
214}
215
216fn general_array_element<O: OffsetSizeTrait>(
217 array: &GenericListArray<O>,
218 indexes: &Int64Array,
219) -> Result<ArrayRef>
220where
221 i64: TryInto<O>,
222{
223 let values = array.values();
224 if values.data_type().is_null() {
225 return Ok(Arc::new(NullArray::new(array.len())));
226 }
227
228 let original_data = values.to_data();
229 let capacity = Capacities::Array(original_data.len());
230
231 let mut mutable =
233 MutableArrayData::with_capacities(vec![&original_data], true, capacity);
234
235 fn adjusted_array_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
236 where
237 i64: TryInto<O>,
238 {
239 let index: O = index.try_into().map_err(|_| {
240 exec_datafusion_err!("array_element got invalid index: {index}")
241 })?;
242 let adjusted_zero_index = if index < O::usize_as(0) {
244 index + len
245 } else {
246 index - O::usize_as(1)
247 };
248
249 if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
250 Ok(Some(adjusted_zero_index))
251 } else {
252 Ok(None)
254 }
255 }
256
257 for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
258 let start = offset_window[0];
259 let end = offset_window[1];
260 let len = end - start;
261
262 if len == O::usize_as(0) {
264 mutable.extend_nulls(1);
265 continue;
266 }
267
268 let index = adjusted_array_index::<O>(indexes.value(row_index), len)?;
269
270 if let Some(index) = index {
271 let start = start.as_usize() + index.as_usize();
272 mutable.extend(0, start, start + 1_usize);
273 } else {
274 mutable.extend_nulls(1);
276 }
277 }
278
279 let data = mutable.freeze();
280 Ok(arrow::array::make_array(data))
281}
282
283#[doc = "returns a slice of the array."]
284pub fn array_slice(array: Expr, begin: Expr, end: Expr, stride: Option<Expr>) -> Expr {
285 let args = match stride {
286 Some(stride) => vec![array, begin, end, stride],
287 None => vec![array, begin, end],
288 };
289 array_slice_udf().call(args)
290}
291
292#[user_doc(
293 doc_section(label = "Array Functions"),
294 description = "Returns a slice of the array based on 1-indexed start and end positions.",
295 syntax_example = "array_slice(array, begin, end)",
296 sql_example = r#"```sql
297> select array_slice([1, 2, 3, 4, 5, 6, 7, 8], 3, 6);
298+--------------------------------------------------------+
299| array_slice(List([1,2,3,4,5,6,7,8]),Int64(3),Int64(6)) |
300+--------------------------------------------------------+
301| [3, 4, 5, 6] |
302+--------------------------------------------------------+
303```"#,
304 argument(
305 name = "array",
306 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
307 ),
308 argument(
309 name = "begin",
310 description = "Index of the first element. If negative, it counts backward from the end of the array."
311 ),
312 argument(
313 name = "end",
314 description = "Index of the last element. If negative, it counts backward from the end of the array."
315 ),
316 argument(
317 name = "stride",
318 description = "Stride of the array slice. The default is 1."
319 )
320)]
321#[derive(Debug, PartialEq, Eq, Hash)]
322pub(super) struct ArraySlice {
323 signature: Signature,
324 aliases: Vec<String>,
325}
326
327impl ArraySlice {
328 pub fn new() -> Self {
329 Self {
330 signature: Signature::one_of(
331 vec![
332 TypeSignature::ArraySignature(ArrayFunctionSignature::Array {
333 arguments: vec![
334 ArrayFunctionArgument::Array,
335 ArrayFunctionArgument::Index,
336 ArrayFunctionArgument::Index,
337 ],
338 array_coercion: Some(ListCoercion::FixedSizedListToList),
339 }),
340 TypeSignature::ArraySignature(ArrayFunctionSignature::Array {
341 arguments: vec![
342 ArrayFunctionArgument::Array,
343 ArrayFunctionArgument::Index,
344 ArrayFunctionArgument::Index,
345 ArrayFunctionArgument::Index,
346 ],
347 array_coercion: Some(ListCoercion::FixedSizedListToList),
348 }),
349 ],
350 Volatility::Immutable,
351 ),
352 aliases: vec![String::from("list_slice")],
353 }
354 }
355}
356
357impl ScalarUDFImpl for ArraySlice {
358 fn as_any(&self) -> &dyn Any {
359 self
360 }
361
362 fn display_name(&self, args: &[Expr]) -> Result<String> {
363 let args_name = args.iter().map(ToString::to_string).collect::<Vec<_>>();
364 if let Some((arr, indexes)) = args_name.split_first() {
365 Ok(format!("{arr}[{}]", indexes.join(":")))
366 } else {
367 exec_err!("no argument")
368 }
369 }
370
371 fn schema_name(&self, args: &[Expr]) -> Result<String> {
372 let args_name = args
373 .iter()
374 .map(|e| e.schema_name().to_string())
375 .collect::<Vec<_>>();
376 if let Some((arr, indexes)) = args_name.split_first() {
377 Ok(format!("{arr}[{}]", indexes.join(":")))
378 } else {
379 exec_err!("no argument")
380 }
381 }
382
383 fn name(&self) -> &str {
384 "array_slice"
385 }
386
387 fn signature(&self) -> &Signature {
388 &self.signature
389 }
390
391 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
392 Ok(arg_types[0].clone())
393 }
394
395 fn invoke_with_args(
396 &self,
397 args: datafusion_expr::ScalarFunctionArgs,
398 ) -> Result<ColumnarValue> {
399 make_scalar_function(array_slice_inner)(&args.args)
400 }
401
402 fn aliases(&self) -> &[String] {
403 &self.aliases
404 }
405
406 fn documentation(&self) -> Option<&Documentation> {
407 self.doc()
408 }
409}
410
411fn array_slice_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
428 let args_len = args.len();
429 if args_len != 3 && args_len != 4 {
430 return exec_err!("array_slice needs three or four arguments");
431 }
432
433 let stride = if args_len == 4 {
434 Some(as_int64_array(&args[3])?)
435 } else {
436 None
437 };
438
439 let from_array = as_int64_array(&args[1])?;
440 let to_array = as_int64_array(&args[2])?;
441
442 let array_data_type = args[0].data_type();
443 match array_data_type {
444 List(_) => {
445 let array = as_list_array(&args[0])?;
446 general_array_slice::<i32>(array, from_array, to_array, stride)
447 }
448 LargeList(_) => {
449 let array = as_large_list_array(&args[0])?;
450 general_array_slice::<i64>(array, from_array, to_array, stride)
451 }
452 _ => exec_err!("array_slice does not support type: {}", array_data_type),
453 }
454}
455
456fn general_array_slice<O: OffsetSizeTrait>(
457 array: &GenericListArray<O>,
458 from_array: &Int64Array,
459 to_array: &Int64Array,
460 stride: Option<&Int64Array>,
461) -> Result<ArrayRef>
462where
463 i64: TryInto<O>,
464{
465 let values = array.values();
466 let original_data = values.to_data();
467 let capacity = Capacities::Array(original_data.len());
468
469 let mut mutable =
470 MutableArrayData::with_capacities(vec![&original_data], true, capacity);
471
472 fn adjusted_from_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
476 where
477 i64: TryInto<O>,
478 {
479 let adjusted_zero_index = if index < 0 {
481 if let Ok(index) = index.try_into() {
482 if index < (O::zero() - O::one()) * len {
489 O::zero()
490 } else {
491 index + len
492 }
493 } else {
494 return exec_err!("array_slice got invalid index: {}", index);
495 }
496 } else {
497 if let Ok(index) = index.try_into() {
499 std::cmp::max(index - O::usize_as(1), O::usize_as(0))
500 } else {
501 return exec_err!("array_slice got invalid index: {}", index);
502 }
503 };
504
505 if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
506 Ok(Some(adjusted_zero_index))
507 } else {
508 Ok(None)
510 }
511 }
512
513 fn adjusted_to_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
514 where
515 i64: TryInto<O>,
516 {
517 let adjusted_zero_index = if index < 0 {
519 if let Ok(index) = index.try_into() {
521 index + len
522 } else {
523 return exec_err!("array_slice got invalid index: {}", index);
524 }
525 } else {
526 if let Ok(index) = index.try_into() {
528 std::cmp::min(index - O::usize_as(1), len - O::usize_as(1))
529 } else {
530 return exec_err!("array_slice got invalid index: {}", index);
531 }
532 };
533
534 if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
535 Ok(Some(adjusted_zero_index))
536 } else {
537 Ok(None)
539 }
540 }
541
542 let mut offsets = vec![O::usize_as(0)];
543 let mut null_builder = NullBufferBuilder::new(array.len());
544
545 for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
546 let start = offset_window[0];
547 let end = offset_window[1];
548 let len = end - start;
549
550 if array.is_null(row_index)
552 || from_array.is_null(row_index)
553 || to_array.is_null(row_index)
554 {
555 mutable.extend_nulls(1);
556 offsets.push(offsets[row_index] + O::usize_as(1));
557 null_builder.append_null();
558 continue;
559 }
560 null_builder.append_non_null();
561
562 if len == O::usize_as(0) {
564 offsets.push(offsets[row_index]);
565 continue;
566 }
567
568 let from_index = adjusted_from_index::<O>(from_array.value(row_index), len)?;
569 let to_index = adjusted_to_index::<O>(to_array.value(row_index), len)?;
570
571 if let (Some(from), Some(to)) = (from_index, to_index) {
572 let stride = stride.map(|s| s.value(row_index));
573 let stride = stride.unwrap_or(1);
575 if stride.is_zero() {
576 return exec_err!(
577 "array_slice got invalid stride: {:?}, it cannot be 0",
578 stride
579 );
580 } else if (from < to && stride.is_negative())
581 || (from > to && stride.is_positive())
582 {
583 offsets.push(offsets[row_index]);
585 continue;
586 }
587
588 let stride: O = stride.try_into().map_err(|_| {
589 internal_datafusion_err!("array_slice got invalid stride: {}", stride)
590 })?;
591
592 if from <= to && stride > O::zero() {
593 assert!(start + to <= end);
594 if stride.eq(&O::one()) {
595 mutable.extend(
597 0,
598 (start + from).to_usize().unwrap(),
599 (start + to + O::usize_as(1)).to_usize().unwrap(),
600 );
601 offsets.push(offsets[row_index] + (to - from + O::usize_as(1)));
602 continue;
603 }
604 let mut index = start + from;
605 let mut cnt = 0;
606 while index <= start + to {
607 mutable.extend(
608 0,
609 index.to_usize().unwrap(),
610 index.to_usize().unwrap() + 1,
611 );
612 index += stride;
613 cnt += 1;
614 }
615 offsets.push(offsets[row_index] + O::usize_as(cnt));
616 } else {
617 let mut index = start + from;
618 let mut cnt = 0;
619 while index >= start + to {
620 mutable.extend(
621 0,
622 index.to_usize().unwrap(),
623 index.to_usize().unwrap() + 1,
624 );
625 index += stride;
626 cnt += 1;
627 }
628 offsets.push(offsets[row_index] + O::usize_as(cnt));
630 }
631 } else {
632 offsets.push(offsets[row_index]);
634 }
635 }
636
637 let data = mutable.freeze();
638
639 Ok(Arc::new(GenericListArray::<O>::try_new(
640 Arc::new(Field::new_list_field(array.value_type(), true)),
641 OffsetBuffer::<O>::new(offsets.into()),
642 arrow::array::make_array(data),
643 null_builder.finish(),
644 )?))
645}
646
647#[user_doc(
648 doc_section(label = "Array Functions"),
649 description = "Returns the array without the first element.",
650 syntax_example = "array_pop_front(array)",
651 sql_example = r#"```sql
652> select array_pop_front([1, 2, 3]);
653+-------------------------------+
654| array_pop_front(List([1,2,3])) |
655+-------------------------------+
656| [2, 3] |
657+-------------------------------+
658```"#,
659 argument(
660 name = "array",
661 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
662 )
663)]
664#[derive(Debug, PartialEq, Eq, Hash)]
665pub(super) struct ArrayPopFront {
666 signature: Signature,
667 aliases: Vec<String>,
668}
669
670impl ArrayPopFront {
671 pub fn new() -> Self {
672 Self {
673 signature: Signature::array(Volatility::Immutable),
674 aliases: vec![String::from("list_pop_front")],
675 }
676 }
677}
678
679impl ScalarUDFImpl for ArrayPopFront {
680 fn as_any(&self) -> &dyn Any {
681 self
682 }
683 fn name(&self) -> &str {
684 "array_pop_front"
685 }
686
687 fn signature(&self) -> &Signature {
688 &self.signature
689 }
690
691 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
692 Ok(arg_types[0].clone())
693 }
694
695 fn invoke_with_args(
696 &self,
697 args: datafusion_expr::ScalarFunctionArgs,
698 ) -> Result<ColumnarValue> {
699 make_scalar_function(array_pop_front_inner)(&args.args)
700 }
701
702 fn aliases(&self) -> &[String] {
703 &self.aliases
704 }
705
706 fn documentation(&self) -> Option<&Documentation> {
707 self.doc()
708 }
709}
710
711fn array_pop_front_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
713 let array_data_type = args[0].data_type();
714 match array_data_type {
715 List(_) => {
716 let array = as_list_array(&args[0])?;
717 general_pop_front_list::<i32>(array)
718 }
719 LargeList(_) => {
720 let array = as_large_list_array(&args[0])?;
721 general_pop_front_list::<i64>(array)
722 }
723 _ => exec_err!("array_pop_front does not support type: {}", array_data_type),
724 }
725}
726
727fn general_pop_front_list<O: OffsetSizeTrait>(
728 array: &GenericListArray<O>,
729) -> Result<ArrayRef>
730where
731 i64: TryInto<O>,
732{
733 let from_array = Int64Array::from(vec![2; array.len()]);
734 let to_array = Int64Array::from(
735 array
736 .iter()
737 .map(|arr| arr.map_or(0, |arr| arr.len() as i64))
738 .collect::<Vec<i64>>(),
739 );
740 general_array_slice::<O>(array, &from_array, &to_array, None)
741}
742
743#[user_doc(
744 doc_section(label = "Array Functions"),
745 description = "Returns the array without the last element.",
746 syntax_example = "array_pop_back(array)",
747 sql_example = r#"```sql
748> select array_pop_back([1, 2, 3]);
749+-------------------------------+
750| array_pop_back(List([1,2,3])) |
751+-------------------------------+
752| [1, 2] |
753+-------------------------------+
754```"#,
755 argument(
756 name = "array",
757 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
758 )
759)]
760#[derive(Debug, PartialEq, Eq, Hash)]
761pub(super) struct ArrayPopBack {
762 signature: Signature,
763 aliases: Vec<String>,
764}
765
766impl ArrayPopBack {
767 pub fn new() -> Self {
768 Self {
769 signature: Signature::array(Volatility::Immutable),
770 aliases: vec![String::from("list_pop_back")],
771 }
772 }
773}
774
775impl ScalarUDFImpl for ArrayPopBack {
776 fn as_any(&self) -> &dyn Any {
777 self
778 }
779 fn name(&self) -> &str {
780 "array_pop_back"
781 }
782
783 fn signature(&self) -> &Signature {
784 &self.signature
785 }
786
787 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
788 Ok(arg_types[0].clone())
789 }
790
791 fn invoke_with_args(
792 &self,
793 args: datafusion_expr::ScalarFunctionArgs,
794 ) -> Result<ColumnarValue> {
795 make_scalar_function(array_pop_back_inner)(&args.args)
796 }
797
798 fn aliases(&self) -> &[String] {
799 &self.aliases
800 }
801
802 fn documentation(&self) -> Option<&Documentation> {
803 self.doc()
804 }
805}
806
807fn array_pop_back_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
809 let [array] = take_function_args("array_pop_back", args)?;
810
811 match array.data_type() {
812 List(_) => {
813 let array = as_list_array(&array)?;
814 general_pop_back_list::<i32>(array)
815 }
816 LargeList(_) => {
817 let array = as_large_list_array(&array)?;
818 general_pop_back_list::<i64>(array)
819 }
820 _ => exec_err!(
821 "array_pop_back does not support type: {}",
822 array.data_type()
823 ),
824 }
825}
826
827fn general_pop_back_list<O: OffsetSizeTrait>(
828 array: &GenericListArray<O>,
829) -> Result<ArrayRef>
830where
831 i64: TryInto<O>,
832{
833 let from_array = Int64Array::from(vec![1; array.len()]);
834 let to_array = Int64Array::from(
835 array
836 .iter()
837 .map(|arr| arr.map_or(0, |arr| arr.len() as i64 - 1))
838 .collect::<Vec<i64>>(),
839 );
840 general_array_slice::<O>(array, &from_array, &to_array, None)
841}
842
843#[user_doc(
844 doc_section(label = "Array Functions"),
845 description = "Returns the first non-null element in the array.",
846 syntax_example = "array_any_value(array)",
847 sql_example = r#"```sql
848> select array_any_value([NULL, 1, 2, 3]);
849+-------------------------------+
850| array_any_value(List([NULL,1,2,3])) |
851+-------------------------------------+
852| 1 |
853+-------------------------------------+
854```"#,
855 argument(
856 name = "array",
857 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
858 )
859)]
860#[derive(Debug, PartialEq, Eq, Hash)]
861pub(super) struct ArrayAnyValue {
862 signature: Signature,
863 aliases: Vec<String>,
864}
865
866impl ArrayAnyValue {
867 pub fn new() -> Self {
868 Self {
869 signature: Signature::array(Volatility::Immutable),
870 aliases: vec![String::from("list_any_value")],
871 }
872 }
873}
874
875impl ScalarUDFImpl for ArrayAnyValue {
876 fn as_any(&self) -> &dyn Any {
877 self
878 }
879 fn name(&self) -> &str {
880 "array_any_value"
881 }
882 fn signature(&self) -> &Signature {
883 &self.signature
884 }
885 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
886 match &arg_types[0] {
887 List(field)
888 | LargeList(field)
889 | FixedSizeList(field, _) => Ok(field.data_type().clone()),
890 _ => plan_err!(
891 "array_any_value can only accept List, LargeList or FixedSizeList as the argument"
892 ),
893 }
894 }
895
896 fn invoke_with_args(
897 &self,
898 args: datafusion_expr::ScalarFunctionArgs,
899 ) -> Result<ColumnarValue> {
900 make_scalar_function(array_any_value_inner)(&args.args)
901 }
902
903 fn aliases(&self) -> &[String] {
904 &self.aliases
905 }
906
907 fn documentation(&self) -> Option<&Documentation> {
908 self.doc()
909 }
910}
911
912fn array_any_value_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
913 let [array] = take_function_args("array_any_value", args)?;
914
915 match &array.data_type() {
916 List(_) => {
917 let array = as_list_array(&array)?;
918 general_array_any_value::<i32>(array)
919 }
920 LargeList(_) => {
921 let array = as_large_list_array(&array)?;
922 general_array_any_value::<i64>(array)
923 }
924 data_type => exec_err!("array_any_value does not support type: {data_type}"),
925 }
926}
927
928fn general_array_any_value<O: OffsetSizeTrait>(
929 array: &GenericListArray<O>,
930) -> Result<ArrayRef>
931where
932 i64: TryInto<O>,
933{
934 let values = array.values();
935 let original_data = values.to_data();
936 let capacity = Capacities::Array(array.len());
937
938 let mut mutable =
939 MutableArrayData::with_capacities(vec![&original_data], true, capacity);
940
941 for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
942 let start = offset_window[0];
943 let end = offset_window[1];
944 let len = end - start;
945
946 if len == O::usize_as(0) {
948 mutable.extend_nulls(1);
949 continue;
950 }
951
952 let row_value = array.value(row_index);
953 match row_value.nulls() {
954 Some(row_nulls_buffer) => {
955 if let Some(first_non_null_index) =
957 row_nulls_buffer.valid_indices().next()
958 {
959 let index = start.as_usize() + first_non_null_index;
960 mutable.extend(0, index, index + 1)
961 } else {
962 mutable.extend_nulls(1);
964 }
965 }
966 None => {
967 let index = start.as_usize();
969 mutable.extend(0, index, index + 1);
970 }
971 }
972 }
973
974 let data = mutable.freeze();
975 Ok(arrow::array::make_array(data))
976}
977
978#[cfg(test)]
979mod tests {
980 use super::array_element_udf;
981 use arrow::datatypes::{DataType, Field};
982 use datafusion_common::{Column, DFSchema};
983 use datafusion_expr::expr::ScalarFunction;
984 use datafusion_expr::{Expr, ExprSchemable};
985 use std::collections::HashMap;
986
987 #[test]
989 fn test_array_element_return_type_fixed_size_list() {
990 let fixed_size_list_type = DataType::FixedSizeList(
991 Field::new("some_arbitrary_test_field", DataType::Int32, false).into(),
992 13,
993 );
994 let array_type = DataType::List(
995 Field::new_list_field(fixed_size_list_type.clone(), true).into(),
996 );
997 let index_type = DataType::Int64;
998
999 let schema = DFSchema::from_unqualified_fields(
1000 vec![
1001 Field::new("my_array", array_type.clone(), false),
1002 Field::new("my_index", index_type.clone(), false),
1003 ]
1004 .into(),
1005 HashMap::default(),
1006 )
1007 .unwrap();
1008
1009 let udf = array_element_udf();
1010
1011 assert_eq!(
1013 udf.return_type(&[array_type.clone(), index_type.clone()])
1014 .unwrap(),
1015 fixed_size_list_type
1016 );
1017
1018 let udf_expr = Expr::ScalarFunction(ScalarFunction {
1020 func: array_element_udf(),
1021 args: vec![
1022 Expr::Column(Column::new_unqualified("my_array")),
1023 Expr::Column(Column::new_unqualified("my_index")),
1024 ],
1025 });
1026 assert_eq!(
1027 ExprSchemable::get_type(&udf_expr, &schema).unwrap(),
1028 fixed_size_list_type
1029 );
1030 }
1031}