1use arrow::array::{
21 Array, ArrayRef, ArrowNativeTypeOp, Capacities, GenericListArray, Int64Array,
22 MutableArrayData, NullBufferBuilder, OffsetSizeTrait,
23};
24use arrow::buffer::OffsetBuffer;
25use arrow::datatypes::DataType;
26use arrow::datatypes::{
27 DataType::{FixedSizeList, LargeList, List},
28 Field,
29};
30use datafusion_common::cast::as_int64_array;
31use datafusion_common::cast::as_large_list_array;
32use datafusion_common::cast::as_list_array;
33use datafusion_common::utils::ListCoercion;
34use datafusion_common::{
35 exec_err, internal_datafusion_err, plan_err, utils::take_function_args,
36 DataFusionError, Result,
37};
38use datafusion_expr::{
39 ArrayFunctionArgument, ArrayFunctionSignature, Expr, TypeSignature,
40};
41use datafusion_expr::{
42 ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
43};
44use datafusion_macros::user_doc;
45use std::any::Any;
46use std::sync::Arc;
47
48use crate::utils::make_scalar_function;
49
50make_udf_expr_and_func!(
52 ArrayElement,
53 array_element,
54 array element,
55 "extracts the element with the index n from the array.",
56 array_element_udf
57);
58
59create_func!(ArraySlice, array_slice_udf);
60
61make_udf_expr_and_func!(
62 ArrayPopFront,
63 array_pop_front,
64 array,
65 "returns the array without the first element.",
66 array_pop_front_udf
67);
68
69make_udf_expr_and_func!(
70 ArrayPopBack,
71 array_pop_back,
72 array,
73 "returns the array without the last element.",
74 array_pop_back_udf
75);
76
77make_udf_expr_and_func!(
78 ArrayAnyValue,
79 array_any_value,
80 array,
81 "returns the first non-null element in the array.",
82 array_any_value_udf
83);
84
85#[user_doc(
86 doc_section(label = "Array Functions"),
87 description = "Extracts the element with the index n from the array.",
88 syntax_example = "array_element(array, index)",
89 sql_example = r#"```sql
90> select array_element([1, 2, 3, 4], 3);
91+-----------------------------------------+
92| array_element(List([1,2,3,4]),Int64(3)) |
93+-----------------------------------------+
94| 3 |
95+-----------------------------------------+
96```"#,
97 argument(
98 name = "array",
99 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
100 ),
101 argument(
102 name = "index",
103 description = "Index to extract the element from the array."
104 )
105)]
106#[derive(Debug)]
107pub struct ArrayElement {
108 signature: Signature,
109 aliases: Vec<String>,
110}
111
112impl Default for ArrayElement {
113 fn default() -> Self {
114 Self::new()
115 }
116}
117
118impl ArrayElement {
119 pub fn new() -> Self {
120 Self {
121 signature: Signature::array_and_index(Volatility::Immutable),
122 aliases: vec![
123 String::from("array_extract"),
124 String::from("list_element"),
125 String::from("list_extract"),
126 ],
127 }
128 }
129}
130
131impl ScalarUDFImpl for ArrayElement {
132 fn as_any(&self) -> &dyn Any {
133 self
134 }
135 fn name(&self) -> &str {
136 "array_element"
137 }
138
139 fn display_name(&self, args: &[Expr]) -> Result<String> {
140 let args_name = args.iter().map(ToString::to_string).collect::<Vec<_>>();
141 if args_name.len() != 2 {
142 return exec_err!("expect 2 args, got {}", args_name.len());
143 }
144
145 Ok(format!("{}[{}]", args_name[0], args_name[1]))
146 }
147
148 fn schema_name(&self, args: &[Expr]) -> Result<String> {
149 let args_name = args
150 .iter()
151 .map(|e| e.schema_name().to_string())
152 .collect::<Vec<_>>();
153 if args_name.len() != 2 {
154 return exec_err!("expect 2 args, got {}", args_name.len());
155 }
156
157 Ok(format!("{}[{}]", args_name[0], args_name[1]))
158 }
159
160 fn signature(&self) -> &Signature {
161 &self.signature
162 }
163
164 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
165 match &arg_types[0] {
166 List(field)
167 | LargeList(field)
168 | FixedSizeList(field, _) => Ok(field.data_type().clone()),
169 DataType::Null => Ok(List(Arc::new(Field::new_list_field(DataType::Int64, true)))),
170 _ => plan_err!(
171 "ArrayElement can only accept List, LargeList or FixedSizeList as the first argument"
172 ),
173 }
174 }
175
176 fn invoke_with_args(
177 &self,
178 args: datafusion_expr::ScalarFunctionArgs,
179 ) -> Result<ColumnarValue> {
180 make_scalar_function(array_element_inner)(&args.args)
181 }
182
183 fn aliases(&self) -> &[String] {
184 &self.aliases
185 }
186
187 fn documentation(&self) -> Option<&Documentation> {
188 self.doc()
189 }
190}
191
192fn array_element_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
200 let [array, indexes] = take_function_args("array_element", args)?;
201
202 match &array.data_type() {
203 List(_) => {
204 let array = as_list_array(&array)?;
205 let indexes = as_int64_array(&indexes)?;
206 general_array_element::<i32>(array, indexes)
207 }
208 LargeList(_) => {
209 let array = as_large_list_array(&array)?;
210 let indexes = as_int64_array(&indexes)?;
211 general_array_element::<i64>(array, indexes)
212 }
213 _ => exec_err!(
214 "array_element does not support type: {:?}",
215 array.data_type()
216 ),
217 }
218}
219
220fn general_array_element<O: OffsetSizeTrait>(
221 array: &GenericListArray<O>,
222 indexes: &Int64Array,
223) -> Result<ArrayRef>
224where
225 i64: TryInto<O>,
226{
227 let values = array.values();
228 let original_data = values.to_data();
229 let capacity = Capacities::Array(original_data.len());
230
231 let mut mutable =
233 MutableArrayData::with_capacities(vec![&original_data], true, capacity);
234
235 fn adjusted_array_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
236 where
237 i64: TryInto<O>,
238 {
239 let index: O = index.try_into().map_err(|_| {
240 DataFusionError::Execution(format!(
241 "array_element got invalid index: {}",
242 index
243 ))
244 })?;
245 let adjusted_zero_index = if index < O::usize_as(0) {
247 index + len
248 } else {
249 index - O::usize_as(1)
250 };
251
252 if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
253 Ok(Some(adjusted_zero_index))
254 } else {
255 Ok(None)
257 }
258 }
259
260 for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
261 let start = offset_window[0];
262 let end = offset_window[1];
263 let len = end - start;
264
265 if len == O::usize_as(0) {
267 mutable.extend_nulls(1);
268 continue;
269 }
270
271 let index = adjusted_array_index::<O>(indexes.value(row_index), len)?;
272
273 if let Some(index) = index {
274 let start = start.as_usize() + index.as_usize();
275 mutable.extend(0, start, start + 1_usize);
276 } else {
277 mutable.extend_nulls(1);
279 }
280 }
281
282 let data = mutable.freeze();
283 Ok(arrow::array::make_array(data))
284}
285
286#[doc = "returns a slice of the array."]
287pub fn array_slice(array: Expr, begin: Expr, end: Expr, stride: Option<Expr>) -> Expr {
288 let args = match stride {
289 Some(stride) => vec![array, begin, end, stride],
290 None => vec![array, begin, end],
291 };
292 array_slice_udf().call(args)
293}
294
295#[user_doc(
296 doc_section(label = "Array Functions"),
297 description = "Returns a slice of the array based on 1-indexed start and end positions.",
298 syntax_example = "array_slice(array, begin, end)",
299 sql_example = r#"```sql
300> select array_slice([1, 2, 3, 4, 5, 6, 7, 8], 3, 6);
301+--------------------------------------------------------+
302| array_slice(List([1,2,3,4,5,6,7,8]),Int64(3),Int64(6)) |
303+--------------------------------------------------------+
304| [3, 4, 5, 6] |
305+--------------------------------------------------------+
306```"#,
307 argument(
308 name = "array",
309 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
310 ),
311 argument(
312 name = "begin",
313 description = "Index of the first element. If negative, it counts backward from the end of the array."
314 ),
315 argument(
316 name = "end",
317 description = "Index of the last element. If negative, it counts backward from the end of the array."
318 ),
319 argument(
320 name = "stride",
321 description = "Stride of the array slice. The default is 1."
322 )
323)]
324#[derive(Debug)]
325pub(super) struct ArraySlice {
326 signature: Signature,
327 aliases: Vec<String>,
328}
329
330impl ArraySlice {
331 pub fn new() -> Self {
332 Self {
333 signature: Signature::one_of(
334 vec![
335 TypeSignature::ArraySignature(ArrayFunctionSignature::Array {
336 arguments: vec![
337 ArrayFunctionArgument::Array,
338 ArrayFunctionArgument::Index,
339 ArrayFunctionArgument::Index,
340 ],
341 array_coercion: None,
342 }),
343 TypeSignature::ArraySignature(ArrayFunctionSignature::Array {
344 arguments: vec![
345 ArrayFunctionArgument::Array,
346 ArrayFunctionArgument::Index,
347 ArrayFunctionArgument::Index,
348 ArrayFunctionArgument::Index,
349 ],
350 array_coercion: None,
351 }),
352 ],
353 Volatility::Immutable,
354 ),
355 aliases: vec![String::from("list_slice")],
356 }
357 }
358}
359
360impl ScalarUDFImpl for ArraySlice {
361 fn as_any(&self) -> &dyn Any {
362 self
363 }
364
365 fn display_name(&self, args: &[Expr]) -> Result<String> {
366 let args_name = args.iter().map(ToString::to_string).collect::<Vec<_>>();
367 if let Some((arr, indexes)) = args_name.split_first() {
368 Ok(format!("{arr}[{}]", indexes.join(":")))
369 } else {
370 exec_err!("no argument")
371 }
372 }
373
374 fn schema_name(&self, args: &[Expr]) -> Result<String> {
375 let args_name = args
376 .iter()
377 .map(|e| e.schema_name().to_string())
378 .collect::<Vec<_>>();
379 if let Some((arr, indexes)) = args_name.split_first() {
380 Ok(format!("{arr}[{}]", indexes.join(":")))
381 } else {
382 exec_err!("no argument")
383 }
384 }
385
386 fn name(&self) -> &str {
387 "array_slice"
388 }
389
390 fn signature(&self) -> &Signature {
391 &self.signature
392 }
393
394 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
395 Ok(arg_types[0].clone())
396 }
397
398 fn invoke_with_args(
399 &self,
400 args: datafusion_expr::ScalarFunctionArgs,
401 ) -> Result<ColumnarValue> {
402 make_scalar_function(array_slice_inner)(&args.args)
403 }
404
405 fn aliases(&self) -> &[String] {
406 &self.aliases
407 }
408
409 fn documentation(&self) -> Option<&Documentation> {
410 self.doc()
411 }
412}
413
414fn array_slice_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
431 let args_len = args.len();
432 if args_len != 3 && args_len != 4 {
433 return exec_err!("array_slice needs three or four arguments");
434 }
435
436 let stride = if args_len == 4 {
437 Some(as_int64_array(&args[3])?)
438 } else {
439 None
440 };
441
442 let from_array = as_int64_array(&args[1])?;
443 let to_array = as_int64_array(&args[2])?;
444
445 let array_data_type = args[0].data_type();
446 match array_data_type {
447 List(_) => {
448 let array = as_list_array(&args[0])?;
449 general_array_slice::<i32>(array, from_array, to_array, stride)
450 }
451 LargeList(_) => {
452 let array = as_large_list_array(&args[0])?;
453 general_array_slice::<i64>(array, from_array, to_array, stride)
454 }
455 _ => exec_err!("array_slice does not support type: {:?}", array_data_type),
456 }
457}
458
459fn general_array_slice<O: OffsetSizeTrait>(
460 array: &GenericListArray<O>,
461 from_array: &Int64Array,
462 to_array: &Int64Array,
463 stride: Option<&Int64Array>,
464) -> Result<ArrayRef>
465where
466 i64: TryInto<O>,
467{
468 let values = array.values();
469 let original_data = values.to_data();
470 let capacity = Capacities::Array(original_data.len());
471
472 let mut mutable =
473 MutableArrayData::with_capacities(vec![&original_data], true, capacity);
474
475 fn adjusted_from_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
479 where
480 i64: TryInto<O>,
481 {
482 let adjusted_zero_index = if index < 0 {
484 if let Ok(index) = index.try_into() {
485 if index < (O::zero() - O::one()) * len {
492 O::zero()
493 } else {
494 index + len
495 }
496 } else {
497 return exec_err!("array_slice got invalid index: {}", index);
498 }
499 } else {
500 if let Ok(index) = index.try_into() {
502 std::cmp::max(index - O::usize_as(1), O::usize_as(0))
503 } else {
504 return exec_err!("array_slice got invalid index: {}", index);
505 }
506 };
507
508 if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
509 Ok(Some(adjusted_zero_index))
510 } else {
511 Ok(None)
513 }
514 }
515
516 fn adjusted_to_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
517 where
518 i64: TryInto<O>,
519 {
520 let adjusted_zero_index = if index < 0 {
522 if let Ok(index) = index.try_into() {
524 index + len
525 } else {
526 return exec_err!("array_slice got invalid index: {}", index);
527 }
528 } else {
529 if let Ok(index) = index.try_into() {
531 std::cmp::min(index - O::usize_as(1), len - O::usize_as(1))
532 } else {
533 return exec_err!("array_slice got invalid index: {}", index);
534 }
535 };
536
537 if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
538 Ok(Some(adjusted_zero_index))
539 } else {
540 Ok(None)
542 }
543 }
544
545 let mut offsets = vec![O::usize_as(0)];
546 let mut null_builder = NullBufferBuilder::new(array.len());
547
548 for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
549 let start = offset_window[0];
550 let end = offset_window[1];
551 let len = end - start;
552
553 if array.is_null(row_index)
555 || from_array.is_null(row_index)
556 || to_array.is_null(row_index)
557 {
558 mutable.extend_nulls(1);
559 offsets.push(offsets[row_index] + O::usize_as(1));
560 null_builder.append_null();
561 continue;
562 }
563 null_builder.append_non_null();
564
565 if len == O::usize_as(0) {
567 offsets.push(offsets[row_index]);
568 continue;
569 }
570
571 let from_index = adjusted_from_index::<O>(from_array.value(row_index), len)?;
572 let to_index = adjusted_to_index::<O>(to_array.value(row_index), len)?;
573
574 if let (Some(from), Some(to)) = (from_index, to_index) {
575 let stride = stride.map(|s| s.value(row_index));
576 let stride = stride.unwrap_or(1);
578 if stride.is_zero() {
579 return exec_err!(
580 "array_slice got invalid stride: {:?}, it cannot be 0",
581 stride
582 );
583 } else if (from < to && stride.is_negative())
584 || (from > to && stride.is_positive())
585 {
586 offsets.push(offsets[row_index]);
588 continue;
589 }
590
591 let stride: O = stride.try_into().map_err(|_| {
592 internal_datafusion_err!("array_slice got invalid stride: {}", stride)
593 })?;
594
595 if from <= to && stride > O::zero() {
596 assert!(start + to <= end);
597 if stride.eq(&O::one()) {
598 mutable.extend(
600 0,
601 (start + from).to_usize().unwrap(),
602 (start + to + O::usize_as(1)).to_usize().unwrap(),
603 );
604 offsets.push(offsets[row_index] + (to - from + O::usize_as(1)));
605 continue;
606 }
607 let mut index = start + from;
608 let mut cnt = 0;
609 while index <= start + to {
610 mutable.extend(
611 0,
612 index.to_usize().unwrap(),
613 index.to_usize().unwrap() + 1,
614 );
615 index += stride;
616 cnt += 1;
617 }
618 offsets.push(offsets[row_index] + O::usize_as(cnt));
619 } else {
620 let mut index = start + from;
621 let mut cnt = 0;
622 while index >= start + to {
623 mutable.extend(
624 0,
625 index.to_usize().unwrap(),
626 index.to_usize().unwrap() + 1,
627 );
628 index += stride;
629 cnt += 1;
630 }
631 offsets.push(offsets[row_index] + O::usize_as(cnt));
633 }
634 } else {
635 offsets.push(offsets[row_index]);
637 }
638 }
639
640 let data = mutable.freeze();
641
642 Ok(Arc::new(GenericListArray::<O>::try_new(
643 Arc::new(Field::new_list_field(array.value_type(), true)),
644 OffsetBuffer::<O>::new(offsets.into()),
645 arrow::array::make_array(data),
646 null_builder.finish(),
647 )?))
648}
649
650#[user_doc(
651 doc_section(label = "Array Functions"),
652 description = "Returns the array without the first element.",
653 syntax_example = "array_pop_front(array)",
654 sql_example = r#"```sql
655> select array_pop_front([1, 2, 3]);
656+-------------------------------+
657| array_pop_front(List([1,2,3])) |
658+-------------------------------+
659| [2, 3] |
660+-------------------------------+
661```"#,
662 argument(
663 name = "array",
664 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
665 )
666)]
667#[derive(Debug)]
668pub(super) struct ArrayPopFront {
669 signature: Signature,
670 aliases: Vec<String>,
671}
672
673impl ArrayPopFront {
674 pub fn new() -> Self {
675 Self {
676 signature: Signature {
677 type_signature: TypeSignature::ArraySignature(
678 ArrayFunctionSignature::Array {
679 arguments: vec![ArrayFunctionArgument::Array],
680 array_coercion: Some(ListCoercion::FixedSizedListToList),
681 },
682 ),
683 volatility: Volatility::Immutable,
684 },
685 aliases: vec![String::from("list_pop_front")],
686 }
687 }
688}
689
690impl ScalarUDFImpl for ArrayPopFront {
691 fn as_any(&self) -> &dyn Any {
692 self
693 }
694 fn name(&self) -> &str {
695 "array_pop_front"
696 }
697
698 fn signature(&self) -> &Signature {
699 &self.signature
700 }
701
702 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
703 Ok(arg_types[0].clone())
704 }
705
706 fn invoke_with_args(
707 &self,
708 args: datafusion_expr::ScalarFunctionArgs,
709 ) -> Result<ColumnarValue> {
710 make_scalar_function(array_pop_front_inner)(&args.args)
711 }
712
713 fn aliases(&self) -> &[String] {
714 &self.aliases
715 }
716
717 fn documentation(&self) -> Option<&Documentation> {
718 self.doc()
719 }
720}
721
722fn array_pop_front_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
724 let array_data_type = args[0].data_type();
725 match array_data_type {
726 List(_) => {
727 let array = as_list_array(&args[0])?;
728 general_pop_front_list::<i32>(array)
729 }
730 LargeList(_) => {
731 let array = as_large_list_array(&args[0])?;
732 general_pop_front_list::<i64>(array)
733 }
734 _ => exec_err!(
735 "array_pop_front does not support type: {:?}",
736 array_data_type
737 ),
738 }
739}
740
741fn general_pop_front_list<O: OffsetSizeTrait>(
742 array: &GenericListArray<O>,
743) -> Result<ArrayRef>
744where
745 i64: TryInto<O>,
746{
747 let from_array = Int64Array::from(vec![2; array.len()]);
748 let to_array = Int64Array::from(
749 array
750 .iter()
751 .map(|arr| arr.map_or(0, |arr| arr.len() as i64))
752 .collect::<Vec<i64>>(),
753 );
754 general_array_slice::<O>(array, &from_array, &to_array, None)
755}
756
757#[user_doc(
758 doc_section(label = "Array Functions"),
759 description = "Returns the array without the last element.",
760 syntax_example = "array_pop_back(array)",
761 sql_example = r#"```sql
762> select array_pop_back([1, 2, 3]);
763+-------------------------------+
764| array_pop_back(List([1,2,3])) |
765+-------------------------------+
766| [1, 2] |
767+-------------------------------+
768```"#,
769 argument(
770 name = "array",
771 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
772 )
773)]
774#[derive(Debug)]
775pub(super) struct ArrayPopBack {
776 signature: Signature,
777 aliases: Vec<String>,
778}
779
780impl ArrayPopBack {
781 pub fn new() -> Self {
782 Self {
783 signature: Signature {
784 type_signature: TypeSignature::ArraySignature(
785 ArrayFunctionSignature::Array {
786 arguments: vec![ArrayFunctionArgument::Array],
787 array_coercion: Some(ListCoercion::FixedSizedListToList),
788 },
789 ),
790 volatility: Volatility::Immutable,
791 },
792 aliases: vec![String::from("list_pop_back")],
793 }
794 }
795}
796
797impl ScalarUDFImpl for ArrayPopBack {
798 fn as_any(&self) -> &dyn Any {
799 self
800 }
801 fn name(&self) -> &str {
802 "array_pop_back"
803 }
804
805 fn signature(&self) -> &Signature {
806 &self.signature
807 }
808
809 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
810 Ok(arg_types[0].clone())
811 }
812
813 fn invoke_with_args(
814 &self,
815 args: datafusion_expr::ScalarFunctionArgs,
816 ) -> Result<ColumnarValue> {
817 make_scalar_function(array_pop_back_inner)(&args.args)
818 }
819
820 fn aliases(&self) -> &[String] {
821 &self.aliases
822 }
823
824 fn documentation(&self) -> Option<&Documentation> {
825 self.doc()
826 }
827}
828
829fn array_pop_back_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
831 let [array] = take_function_args("array_pop_back", args)?;
832
833 match array.data_type() {
834 List(_) => {
835 let array = as_list_array(&array)?;
836 general_pop_back_list::<i32>(array)
837 }
838 LargeList(_) => {
839 let array = as_large_list_array(&array)?;
840 general_pop_back_list::<i64>(array)
841 }
842 _ => exec_err!(
843 "array_pop_back does not support type: {:?}",
844 array.data_type()
845 ),
846 }
847}
848
849fn general_pop_back_list<O: OffsetSizeTrait>(
850 array: &GenericListArray<O>,
851) -> Result<ArrayRef>
852where
853 i64: TryInto<O>,
854{
855 let from_array = Int64Array::from(vec![1; array.len()]);
856 let to_array = Int64Array::from(
857 array
858 .iter()
859 .map(|arr| arr.map_or(0, |arr| arr.len() as i64 - 1))
860 .collect::<Vec<i64>>(),
861 );
862 general_array_slice::<O>(array, &from_array, &to_array, None)
863}
864
865#[user_doc(
866 doc_section(label = "Array Functions"),
867 description = "Returns the first non-null element in the array.",
868 syntax_example = "array_any_value(array)",
869 sql_example = r#"```sql
870> select array_any_value([NULL, 1, 2, 3]);
871+-------------------------------+
872| array_any_value(List([NULL,1,2,3])) |
873+-------------------------------------+
874| 1 |
875+-------------------------------------+
876```"#,
877 argument(
878 name = "array",
879 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
880 )
881)]
882#[derive(Debug)]
883pub(super) struct ArrayAnyValue {
884 signature: Signature,
885 aliases: Vec<String>,
886}
887
888impl ArrayAnyValue {
889 pub fn new() -> Self {
890 Self {
891 signature: Signature::array(Volatility::Immutable),
892 aliases: vec![String::from("list_any_value")],
893 }
894 }
895}
896
897impl ScalarUDFImpl for ArrayAnyValue {
898 fn as_any(&self) -> &dyn Any {
899 self
900 }
901 fn name(&self) -> &str {
902 "array_any_value"
903 }
904 fn signature(&self) -> &Signature {
905 &self.signature
906 }
907 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
908 match &arg_types[0] {
909 List(field)
910 | LargeList(field)
911 | FixedSizeList(field, _) => Ok(field.data_type().clone()),
912 _ => plan_err!(
913 "array_any_value can only accept List, LargeList or FixedSizeList as the argument"
914 ),
915 }
916 }
917
918 fn invoke_with_args(
919 &self,
920 args: datafusion_expr::ScalarFunctionArgs,
921 ) -> Result<ColumnarValue> {
922 make_scalar_function(array_any_value_inner)(&args.args)
923 }
924
925 fn aliases(&self) -> &[String] {
926 &self.aliases
927 }
928
929 fn documentation(&self) -> Option<&Documentation> {
930 self.doc()
931 }
932}
933
934fn array_any_value_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
935 let [array] = take_function_args("array_any_value", args)?;
936
937 match &array.data_type() {
938 List(_) => {
939 let array = as_list_array(&array)?;
940 general_array_any_value::<i32>(array)
941 }
942 LargeList(_) => {
943 let array = as_large_list_array(&array)?;
944 general_array_any_value::<i64>(array)
945 }
946 data_type => exec_err!("array_any_value does not support type: {:?}", data_type),
947 }
948}
949
950fn general_array_any_value<O: OffsetSizeTrait>(
951 array: &GenericListArray<O>,
952) -> Result<ArrayRef>
953where
954 i64: TryInto<O>,
955{
956 let values = array.values();
957 let original_data = values.to_data();
958 let capacity = Capacities::Array(array.len());
959
960 let mut mutable =
961 MutableArrayData::with_capacities(vec![&original_data], true, capacity);
962
963 for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
964 let start = offset_window[0];
965 let end = offset_window[1];
966 let len = end - start;
967
968 if len == O::usize_as(0) {
970 mutable.extend_nulls(1);
971 continue;
972 }
973
974 let row_value = array.value(row_index);
975 match row_value.nulls() {
976 Some(row_nulls_buffer) => {
977 if let Some(first_non_null_index) =
979 row_nulls_buffer.valid_indices().next()
980 {
981 let index = start.as_usize() + first_non_null_index;
982 mutable.extend(0, index, index + 1)
983 } else {
984 mutable.extend_nulls(1);
986 }
987 }
988 None => {
989 let index = start.as_usize();
991 mutable.extend(0, index, index + 1);
992 }
993 }
994 }
995
996 let data = mutable.freeze();
997 Ok(arrow::array::make_array(data))
998}
999
1000#[cfg(test)]
1001mod tests {
1002 use super::array_element_udf;
1003 use arrow::datatypes::{DataType, Field};
1004 use datafusion_common::{Column, DFSchema};
1005 use datafusion_expr::expr::ScalarFunction;
1006 use datafusion_expr::{Expr, ExprSchemable};
1007 use std::collections::HashMap;
1008
1009 #[test]
1011 fn test_array_element_return_type_fixed_size_list() {
1012 let fixed_size_list_type = DataType::FixedSizeList(
1013 Field::new("some_arbitrary_test_field", DataType::Int32, false).into(),
1014 13,
1015 );
1016 let array_type = DataType::List(
1017 Field::new_list_field(fixed_size_list_type.clone(), true).into(),
1018 );
1019 let index_type = DataType::Int64;
1020
1021 let schema = DFSchema::from_unqualified_fields(
1022 vec![
1023 Field::new("my_array", array_type.clone(), false),
1024 Field::new("my_index", index_type.clone(), false),
1025 ]
1026 .into(),
1027 HashMap::default(),
1028 )
1029 .unwrap();
1030
1031 let udf = array_element_udf();
1032
1033 assert_eq!(
1035 udf.return_type(&[array_type.clone(), index_type.clone()])
1036 .unwrap(),
1037 fixed_size_list_type
1038 );
1039
1040 let udf_expr = Expr::ScalarFunction(ScalarFunction {
1042 func: array_element_udf(),
1043 args: vec![
1044 Expr::Column(Column::new_unqualified("my_array")),
1045 Expr::Column(Column::new_unqualified("my_index")),
1046 ],
1047 });
1048 assert_eq!(
1049 ExprSchemable::get_type(&udf_expr, &schema).unwrap(),
1050 fixed_size_list_type
1051 );
1052 }
1053}