1use arrow::array::{
21 Array, ArrayRef, ArrowNativeTypeOp, Capacities, GenericListArray, Int64Array,
22 MutableArrayData, NullArray, NullBufferBuilder, OffsetSizeTrait,
23};
24use arrow::buffer::OffsetBuffer;
25use arrow::datatypes::DataType;
26use arrow::datatypes::{
27 DataType::{FixedSizeList, LargeList, List, Null},
28 Field,
29};
30use datafusion_common::cast::as_int64_array;
31use datafusion_common::cast::as_large_list_array;
32use datafusion_common::cast::as_list_array;
33use datafusion_common::utils::ListCoercion;
34use datafusion_common::{
35 exec_err, internal_datafusion_err, plan_err, utils::take_function_args,
36 DataFusionError, Result,
37};
38use datafusion_expr::{
39 ArrayFunctionArgument, ArrayFunctionSignature, Expr, TypeSignature,
40};
41use datafusion_expr::{
42 ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
43};
44use datafusion_macros::user_doc;
45use std::any::Any;
46use std::sync::Arc;
47
48use crate::utils::make_scalar_function;
49
50make_udf_expr_and_func!(
52 ArrayElement,
53 array_element,
54 array element,
55 "extracts the element with the index n from the array.",
56 array_element_udf
57);
58
59create_func!(ArraySlice, array_slice_udf);
60
61make_udf_expr_and_func!(
62 ArrayPopFront,
63 array_pop_front,
64 array,
65 "returns the array without the first element.",
66 array_pop_front_udf
67);
68
69make_udf_expr_and_func!(
70 ArrayPopBack,
71 array_pop_back,
72 array,
73 "returns the array without the last element.",
74 array_pop_back_udf
75);
76
77make_udf_expr_and_func!(
78 ArrayAnyValue,
79 array_any_value,
80 array,
81 "returns the first non-null element in the array.",
82 array_any_value_udf
83);
84
85#[user_doc(
86 doc_section(label = "Array Functions"),
87 description = "Extracts the element with the index n from the array.",
88 syntax_example = "array_element(array, index)",
89 sql_example = r#"```sql
90> select array_element([1, 2, 3, 4], 3);
91+-----------------------------------------+
92| array_element(List([1,2,3,4]),Int64(3)) |
93+-----------------------------------------+
94| 3 |
95+-----------------------------------------+
96```"#,
97 argument(
98 name = "array",
99 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
100 ),
101 argument(
102 name = "index",
103 description = "Index to extract the element from the array."
104 )
105)]
106#[derive(Debug)]
107pub struct ArrayElement {
108 signature: Signature,
109 aliases: Vec<String>,
110}
111
112impl Default for ArrayElement {
113 fn default() -> Self {
114 Self::new()
115 }
116}
117
118impl ArrayElement {
119 pub fn new() -> Self {
120 Self {
121 signature: Signature::array_and_index(Volatility::Immutable),
122 aliases: vec![
123 String::from("array_extract"),
124 String::from("list_element"),
125 String::from("list_extract"),
126 ],
127 }
128 }
129}
130
131impl ScalarUDFImpl for ArrayElement {
132 fn as_any(&self) -> &dyn Any {
133 self
134 }
135 fn name(&self) -> &str {
136 "array_element"
137 }
138
139 fn display_name(&self, args: &[Expr]) -> Result<String> {
140 let args_name = args.iter().map(ToString::to_string).collect::<Vec<_>>();
141 if args_name.len() != 2 {
142 return exec_err!("expect 2 args, got {}", args_name.len());
143 }
144
145 Ok(format!("{}[{}]", args_name[0], args_name[1]))
146 }
147
148 fn schema_name(&self, args: &[Expr]) -> Result<String> {
149 let args_name = args
150 .iter()
151 .map(|e| e.schema_name().to_string())
152 .collect::<Vec<_>>();
153 if args_name.len() != 2 {
154 return exec_err!("expect 2 args, got {}", args_name.len());
155 }
156
157 Ok(format!("{}[{}]", args_name[0], args_name[1]))
158 }
159
160 fn signature(&self) -> &Signature {
161 &self.signature
162 }
163
164 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
165 match &arg_types[0] {
166 Null => Ok(Null),
167 List(field) | LargeList(field) => Ok(field.data_type().clone()),
168 arg_type => plan_err!("{} does not support type {arg_type}", self.name()),
169 }
170 }
171
172 fn invoke_with_args(
173 &self,
174 args: datafusion_expr::ScalarFunctionArgs,
175 ) -> Result<ColumnarValue> {
176 make_scalar_function(array_element_inner)(&args.args)
177 }
178
179 fn aliases(&self) -> &[String] {
180 &self.aliases
181 }
182
183 fn documentation(&self) -> Option<&Documentation> {
184 self.doc()
185 }
186}
187
188fn array_element_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
196 let [array, indexes] = take_function_args("array_element", args)?;
197
198 match &array.data_type() {
199 Null => Ok(Arc::new(NullArray::new(array.len()))),
200 List(_) => {
201 let array = as_list_array(&array)?;
202 let indexes = as_int64_array(&indexes)?;
203 general_array_element::<i32>(array, indexes)
204 }
205 LargeList(_) => {
206 let array = as_large_list_array(&array)?;
207 let indexes = as_int64_array(&indexes)?;
208 general_array_element::<i64>(array, indexes)
209 }
210 arg_type => {
211 exec_err!("array_element does not support type {arg_type}")
212 }
213 }
214}
215
216fn general_array_element<O: OffsetSizeTrait>(
217 array: &GenericListArray<O>,
218 indexes: &Int64Array,
219) -> Result<ArrayRef>
220where
221 i64: TryInto<O>,
222{
223 let values = array.values();
224 if values.data_type().is_null() {
225 return Ok(Arc::new(NullArray::new(array.len())));
226 }
227
228 let original_data = values.to_data();
229 let capacity = Capacities::Array(original_data.len());
230
231 let mut mutable =
233 MutableArrayData::with_capacities(vec![&original_data], true, capacity);
234
235 fn adjusted_array_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
236 where
237 i64: TryInto<O>,
238 {
239 let index: O = index.try_into().map_err(|_| {
240 DataFusionError::Execution(format!(
241 "array_element got invalid index: {index}"
242 ))
243 })?;
244 let adjusted_zero_index = if index < O::usize_as(0) {
246 index + len
247 } else {
248 index - O::usize_as(1)
249 };
250
251 if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
252 Ok(Some(adjusted_zero_index))
253 } else {
254 Ok(None)
256 }
257 }
258
259 for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
260 let start = offset_window[0];
261 let end = offset_window[1];
262 let len = end - start;
263
264 if len == O::usize_as(0) {
266 mutable.extend_nulls(1);
267 continue;
268 }
269
270 let index = adjusted_array_index::<O>(indexes.value(row_index), len)?;
271
272 if let Some(index) = index {
273 let start = start.as_usize() + index.as_usize();
274 mutable.extend(0, start, start + 1_usize);
275 } else {
276 mutable.extend_nulls(1);
278 }
279 }
280
281 let data = mutable.freeze();
282 Ok(arrow::array::make_array(data))
283}
284
285#[doc = "returns a slice of the array."]
286pub fn array_slice(array: Expr, begin: Expr, end: Expr, stride: Option<Expr>) -> Expr {
287 let args = match stride {
288 Some(stride) => vec![array, begin, end, stride],
289 None => vec![array, begin, end],
290 };
291 array_slice_udf().call(args)
292}
293
294#[user_doc(
295 doc_section(label = "Array Functions"),
296 description = "Returns a slice of the array based on 1-indexed start and end positions.",
297 syntax_example = "array_slice(array, begin, end)",
298 sql_example = r#"```sql
299> select array_slice([1, 2, 3, 4, 5, 6, 7, 8], 3, 6);
300+--------------------------------------------------------+
301| array_slice(List([1,2,3,4,5,6,7,8]),Int64(3),Int64(6)) |
302+--------------------------------------------------------+
303| [3, 4, 5, 6] |
304+--------------------------------------------------------+
305```"#,
306 argument(
307 name = "array",
308 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
309 ),
310 argument(
311 name = "begin",
312 description = "Index of the first element. If negative, it counts backward from the end of the array."
313 ),
314 argument(
315 name = "end",
316 description = "Index of the last element. If negative, it counts backward from the end of the array."
317 ),
318 argument(
319 name = "stride",
320 description = "Stride of the array slice. The default is 1."
321 )
322)]
323#[derive(Debug)]
324pub(super) struct ArraySlice {
325 signature: Signature,
326 aliases: Vec<String>,
327}
328
329impl ArraySlice {
330 pub fn new() -> Self {
331 Self {
332 signature: Signature::one_of(
333 vec![
334 TypeSignature::ArraySignature(ArrayFunctionSignature::Array {
335 arguments: vec![
336 ArrayFunctionArgument::Array,
337 ArrayFunctionArgument::Index,
338 ArrayFunctionArgument::Index,
339 ],
340 array_coercion: None,
341 }),
342 TypeSignature::ArraySignature(ArrayFunctionSignature::Array {
343 arguments: vec![
344 ArrayFunctionArgument::Array,
345 ArrayFunctionArgument::Index,
346 ArrayFunctionArgument::Index,
347 ArrayFunctionArgument::Index,
348 ],
349 array_coercion: None,
350 }),
351 ],
352 Volatility::Immutable,
353 ),
354 aliases: vec![String::from("list_slice")],
355 }
356 }
357}
358
359impl ScalarUDFImpl for ArraySlice {
360 fn as_any(&self) -> &dyn Any {
361 self
362 }
363
364 fn display_name(&self, args: &[Expr]) -> Result<String> {
365 let args_name = args.iter().map(ToString::to_string).collect::<Vec<_>>();
366 if let Some((arr, indexes)) = args_name.split_first() {
367 Ok(format!("{arr}[{}]", indexes.join(":")))
368 } else {
369 exec_err!("no argument")
370 }
371 }
372
373 fn schema_name(&self, args: &[Expr]) -> Result<String> {
374 let args_name = args
375 .iter()
376 .map(|e| e.schema_name().to_string())
377 .collect::<Vec<_>>();
378 if let Some((arr, indexes)) = args_name.split_first() {
379 Ok(format!("{arr}[{}]", indexes.join(":")))
380 } else {
381 exec_err!("no argument")
382 }
383 }
384
385 fn name(&self) -> &str {
386 "array_slice"
387 }
388
389 fn signature(&self) -> &Signature {
390 &self.signature
391 }
392
393 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
394 Ok(arg_types[0].clone())
395 }
396
397 fn invoke_with_args(
398 &self,
399 args: datafusion_expr::ScalarFunctionArgs,
400 ) -> Result<ColumnarValue> {
401 make_scalar_function(array_slice_inner)(&args.args)
402 }
403
404 fn aliases(&self) -> &[String] {
405 &self.aliases
406 }
407
408 fn documentation(&self) -> Option<&Documentation> {
409 self.doc()
410 }
411}
412
413fn array_slice_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
430 let args_len = args.len();
431 if args_len != 3 && args_len != 4 {
432 return exec_err!("array_slice needs three or four arguments");
433 }
434
435 let stride = if args_len == 4 {
436 Some(as_int64_array(&args[3])?)
437 } else {
438 None
439 };
440
441 let from_array = as_int64_array(&args[1])?;
442 let to_array = as_int64_array(&args[2])?;
443
444 let array_data_type = args[0].data_type();
445 match array_data_type {
446 List(_) => {
447 let array = as_list_array(&args[0])?;
448 general_array_slice::<i32>(array, from_array, to_array, stride)
449 }
450 LargeList(_) => {
451 let array = as_large_list_array(&args[0])?;
452 general_array_slice::<i64>(array, from_array, to_array, stride)
453 }
454 _ => exec_err!("array_slice does not support type: {:?}", array_data_type),
455 }
456}
457
458fn general_array_slice<O: OffsetSizeTrait>(
459 array: &GenericListArray<O>,
460 from_array: &Int64Array,
461 to_array: &Int64Array,
462 stride: Option<&Int64Array>,
463) -> Result<ArrayRef>
464where
465 i64: TryInto<O>,
466{
467 let values = array.values();
468 let original_data = values.to_data();
469 let capacity = Capacities::Array(original_data.len());
470
471 let mut mutable =
472 MutableArrayData::with_capacities(vec![&original_data], true, capacity);
473
474 fn adjusted_from_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
478 where
479 i64: TryInto<O>,
480 {
481 let adjusted_zero_index = if index < 0 {
483 if let Ok(index) = index.try_into() {
484 if index < (O::zero() - O::one()) * len {
491 O::zero()
492 } else {
493 index + len
494 }
495 } else {
496 return exec_err!("array_slice got invalid index: {}", index);
497 }
498 } else {
499 if let Ok(index) = index.try_into() {
501 std::cmp::max(index - O::usize_as(1), O::usize_as(0))
502 } else {
503 return exec_err!("array_slice got invalid index: {}", index);
504 }
505 };
506
507 if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
508 Ok(Some(adjusted_zero_index))
509 } else {
510 Ok(None)
512 }
513 }
514
515 fn adjusted_to_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
516 where
517 i64: TryInto<O>,
518 {
519 let adjusted_zero_index = if index < 0 {
521 if let Ok(index) = index.try_into() {
523 index + len
524 } else {
525 return exec_err!("array_slice got invalid index: {}", index);
526 }
527 } else {
528 if let Ok(index) = index.try_into() {
530 std::cmp::min(index - O::usize_as(1), len - O::usize_as(1))
531 } else {
532 return exec_err!("array_slice got invalid index: {}", index);
533 }
534 };
535
536 if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
537 Ok(Some(adjusted_zero_index))
538 } else {
539 Ok(None)
541 }
542 }
543
544 let mut offsets = vec![O::usize_as(0)];
545 let mut null_builder = NullBufferBuilder::new(array.len());
546
547 for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
548 let start = offset_window[0];
549 let end = offset_window[1];
550 let len = end - start;
551
552 if array.is_null(row_index)
554 || from_array.is_null(row_index)
555 || to_array.is_null(row_index)
556 {
557 mutable.extend_nulls(1);
558 offsets.push(offsets[row_index] + O::usize_as(1));
559 null_builder.append_null();
560 continue;
561 }
562 null_builder.append_non_null();
563
564 if len == O::usize_as(0) {
566 offsets.push(offsets[row_index]);
567 continue;
568 }
569
570 let from_index = adjusted_from_index::<O>(from_array.value(row_index), len)?;
571 let to_index = adjusted_to_index::<O>(to_array.value(row_index), len)?;
572
573 if let (Some(from), Some(to)) = (from_index, to_index) {
574 let stride = stride.map(|s| s.value(row_index));
575 let stride = stride.unwrap_or(1);
577 if stride.is_zero() {
578 return exec_err!(
579 "array_slice got invalid stride: {:?}, it cannot be 0",
580 stride
581 );
582 } else if (from < to && stride.is_negative())
583 || (from > to && stride.is_positive())
584 {
585 offsets.push(offsets[row_index]);
587 continue;
588 }
589
590 let stride: O = stride.try_into().map_err(|_| {
591 internal_datafusion_err!("array_slice got invalid stride: {}", stride)
592 })?;
593
594 if from <= to && stride > O::zero() {
595 assert!(start + to <= end);
596 if stride.eq(&O::one()) {
597 mutable.extend(
599 0,
600 (start + from).to_usize().unwrap(),
601 (start + to + O::usize_as(1)).to_usize().unwrap(),
602 );
603 offsets.push(offsets[row_index] + (to - from + O::usize_as(1)));
604 continue;
605 }
606 let mut index = start + from;
607 let mut cnt = 0;
608 while index <= start + to {
609 mutable.extend(
610 0,
611 index.to_usize().unwrap(),
612 index.to_usize().unwrap() + 1,
613 );
614 index += stride;
615 cnt += 1;
616 }
617 offsets.push(offsets[row_index] + O::usize_as(cnt));
618 } else {
619 let mut index = start + from;
620 let mut cnt = 0;
621 while index >= start + to {
622 mutable.extend(
623 0,
624 index.to_usize().unwrap(),
625 index.to_usize().unwrap() + 1,
626 );
627 index += stride;
628 cnt += 1;
629 }
630 offsets.push(offsets[row_index] + O::usize_as(cnt));
632 }
633 } else {
634 offsets.push(offsets[row_index]);
636 }
637 }
638
639 let data = mutable.freeze();
640
641 Ok(Arc::new(GenericListArray::<O>::try_new(
642 Arc::new(Field::new_list_field(array.value_type(), true)),
643 OffsetBuffer::<O>::new(offsets.into()),
644 arrow::array::make_array(data),
645 null_builder.finish(),
646 )?))
647}
648
649#[user_doc(
650 doc_section(label = "Array Functions"),
651 description = "Returns the array without the first element.",
652 syntax_example = "array_pop_front(array)",
653 sql_example = r#"```sql
654> select array_pop_front([1, 2, 3]);
655+-------------------------------+
656| array_pop_front(List([1,2,3])) |
657+-------------------------------+
658| [2, 3] |
659+-------------------------------+
660```"#,
661 argument(
662 name = "array",
663 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
664 )
665)]
666#[derive(Debug)]
667pub(super) struct ArrayPopFront {
668 signature: Signature,
669 aliases: Vec<String>,
670}
671
672impl ArrayPopFront {
673 pub fn new() -> Self {
674 Self {
675 signature: Signature {
676 type_signature: TypeSignature::ArraySignature(
677 ArrayFunctionSignature::Array {
678 arguments: vec![ArrayFunctionArgument::Array],
679 array_coercion: Some(ListCoercion::FixedSizedListToList),
680 },
681 ),
682 volatility: Volatility::Immutable,
683 },
684 aliases: vec![String::from("list_pop_front")],
685 }
686 }
687}
688
689impl ScalarUDFImpl for ArrayPopFront {
690 fn as_any(&self) -> &dyn Any {
691 self
692 }
693 fn name(&self) -> &str {
694 "array_pop_front"
695 }
696
697 fn signature(&self) -> &Signature {
698 &self.signature
699 }
700
701 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
702 Ok(arg_types[0].clone())
703 }
704
705 fn invoke_with_args(
706 &self,
707 args: datafusion_expr::ScalarFunctionArgs,
708 ) -> Result<ColumnarValue> {
709 make_scalar_function(array_pop_front_inner)(&args.args)
710 }
711
712 fn aliases(&self) -> &[String] {
713 &self.aliases
714 }
715
716 fn documentation(&self) -> Option<&Documentation> {
717 self.doc()
718 }
719}
720
721fn array_pop_front_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
723 let array_data_type = args[0].data_type();
724 match array_data_type {
725 List(_) => {
726 let array = as_list_array(&args[0])?;
727 general_pop_front_list::<i32>(array)
728 }
729 LargeList(_) => {
730 let array = as_large_list_array(&args[0])?;
731 general_pop_front_list::<i64>(array)
732 }
733 _ => exec_err!(
734 "array_pop_front does not support type: {:?}",
735 array_data_type
736 ),
737 }
738}
739
740fn general_pop_front_list<O: OffsetSizeTrait>(
741 array: &GenericListArray<O>,
742) -> Result<ArrayRef>
743where
744 i64: TryInto<O>,
745{
746 let from_array = Int64Array::from(vec![2; array.len()]);
747 let to_array = Int64Array::from(
748 array
749 .iter()
750 .map(|arr| arr.map_or(0, |arr| arr.len() as i64))
751 .collect::<Vec<i64>>(),
752 );
753 general_array_slice::<O>(array, &from_array, &to_array, None)
754}
755
756#[user_doc(
757 doc_section(label = "Array Functions"),
758 description = "Returns the array without the last element.",
759 syntax_example = "array_pop_back(array)",
760 sql_example = r#"```sql
761> select array_pop_back([1, 2, 3]);
762+-------------------------------+
763| array_pop_back(List([1,2,3])) |
764+-------------------------------+
765| [1, 2] |
766+-------------------------------+
767```"#,
768 argument(
769 name = "array",
770 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
771 )
772)]
773#[derive(Debug)]
774pub(super) struct ArrayPopBack {
775 signature: Signature,
776 aliases: Vec<String>,
777}
778
779impl ArrayPopBack {
780 pub fn new() -> Self {
781 Self {
782 signature: Signature {
783 type_signature: TypeSignature::ArraySignature(
784 ArrayFunctionSignature::Array {
785 arguments: vec![ArrayFunctionArgument::Array],
786 array_coercion: Some(ListCoercion::FixedSizedListToList),
787 },
788 ),
789 volatility: Volatility::Immutable,
790 },
791 aliases: vec![String::from("list_pop_back")],
792 }
793 }
794}
795
796impl ScalarUDFImpl for ArrayPopBack {
797 fn as_any(&self) -> &dyn Any {
798 self
799 }
800 fn name(&self) -> &str {
801 "array_pop_back"
802 }
803
804 fn signature(&self) -> &Signature {
805 &self.signature
806 }
807
808 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
809 Ok(arg_types[0].clone())
810 }
811
812 fn invoke_with_args(
813 &self,
814 args: datafusion_expr::ScalarFunctionArgs,
815 ) -> Result<ColumnarValue> {
816 make_scalar_function(array_pop_back_inner)(&args.args)
817 }
818
819 fn aliases(&self) -> &[String] {
820 &self.aliases
821 }
822
823 fn documentation(&self) -> Option<&Documentation> {
824 self.doc()
825 }
826}
827
828fn array_pop_back_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
830 let [array] = take_function_args("array_pop_back", args)?;
831
832 match array.data_type() {
833 List(_) => {
834 let array = as_list_array(&array)?;
835 general_pop_back_list::<i32>(array)
836 }
837 LargeList(_) => {
838 let array = as_large_list_array(&array)?;
839 general_pop_back_list::<i64>(array)
840 }
841 _ => exec_err!(
842 "array_pop_back does not support type: {:?}",
843 array.data_type()
844 ),
845 }
846}
847
848fn general_pop_back_list<O: OffsetSizeTrait>(
849 array: &GenericListArray<O>,
850) -> Result<ArrayRef>
851where
852 i64: TryInto<O>,
853{
854 let from_array = Int64Array::from(vec![1; array.len()]);
855 let to_array = Int64Array::from(
856 array
857 .iter()
858 .map(|arr| arr.map_or(0, |arr| arr.len() as i64 - 1))
859 .collect::<Vec<i64>>(),
860 );
861 general_array_slice::<O>(array, &from_array, &to_array, None)
862}
863
864#[user_doc(
865 doc_section(label = "Array Functions"),
866 description = "Returns the first non-null element in the array.",
867 syntax_example = "array_any_value(array)",
868 sql_example = r#"```sql
869> select array_any_value([NULL, 1, 2, 3]);
870+-------------------------------+
871| array_any_value(List([NULL,1,2,3])) |
872+-------------------------------------+
873| 1 |
874+-------------------------------------+
875```"#,
876 argument(
877 name = "array",
878 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
879 )
880)]
881#[derive(Debug)]
882pub(super) struct ArrayAnyValue {
883 signature: Signature,
884 aliases: Vec<String>,
885}
886
887impl ArrayAnyValue {
888 pub fn new() -> Self {
889 Self {
890 signature: Signature::array(Volatility::Immutable),
891 aliases: vec![String::from("list_any_value")],
892 }
893 }
894}
895
896impl ScalarUDFImpl for ArrayAnyValue {
897 fn as_any(&self) -> &dyn Any {
898 self
899 }
900 fn name(&self) -> &str {
901 "array_any_value"
902 }
903 fn signature(&self) -> &Signature {
904 &self.signature
905 }
906 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
907 match &arg_types[0] {
908 List(field)
909 | LargeList(field)
910 | FixedSizeList(field, _) => Ok(field.data_type().clone()),
911 _ => plan_err!(
912 "array_any_value can only accept List, LargeList or FixedSizeList as the argument"
913 ),
914 }
915 }
916
917 fn invoke_with_args(
918 &self,
919 args: datafusion_expr::ScalarFunctionArgs,
920 ) -> Result<ColumnarValue> {
921 make_scalar_function(array_any_value_inner)(&args.args)
922 }
923
924 fn aliases(&self) -> &[String] {
925 &self.aliases
926 }
927
928 fn documentation(&self) -> Option<&Documentation> {
929 self.doc()
930 }
931}
932
933fn array_any_value_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
934 let [array] = take_function_args("array_any_value", args)?;
935
936 match &array.data_type() {
937 List(_) => {
938 let array = as_list_array(&array)?;
939 general_array_any_value::<i32>(array)
940 }
941 LargeList(_) => {
942 let array = as_large_list_array(&array)?;
943 general_array_any_value::<i64>(array)
944 }
945 data_type => exec_err!("array_any_value does not support type: {:?}", data_type),
946 }
947}
948
949fn general_array_any_value<O: OffsetSizeTrait>(
950 array: &GenericListArray<O>,
951) -> Result<ArrayRef>
952where
953 i64: TryInto<O>,
954{
955 let values = array.values();
956 let original_data = values.to_data();
957 let capacity = Capacities::Array(array.len());
958
959 let mut mutable =
960 MutableArrayData::with_capacities(vec![&original_data], true, capacity);
961
962 for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
963 let start = offset_window[0];
964 let end = offset_window[1];
965 let len = end - start;
966
967 if len == O::usize_as(0) {
969 mutable.extend_nulls(1);
970 continue;
971 }
972
973 let row_value = array.value(row_index);
974 match row_value.nulls() {
975 Some(row_nulls_buffer) => {
976 if let Some(first_non_null_index) =
978 row_nulls_buffer.valid_indices().next()
979 {
980 let index = start.as_usize() + first_non_null_index;
981 mutable.extend(0, index, index + 1)
982 } else {
983 mutable.extend_nulls(1);
985 }
986 }
987 None => {
988 let index = start.as_usize();
990 mutable.extend(0, index, index + 1);
991 }
992 }
993 }
994
995 let data = mutable.freeze();
996 Ok(arrow::array::make_array(data))
997}
998
999#[cfg(test)]
1000mod tests {
1001 use super::array_element_udf;
1002 use arrow::datatypes::{DataType, Field};
1003 use datafusion_common::{Column, DFSchema};
1004 use datafusion_expr::expr::ScalarFunction;
1005 use datafusion_expr::{Expr, ExprSchemable};
1006 use std::collections::HashMap;
1007
1008 #[test]
1010 fn test_array_element_return_type_fixed_size_list() {
1011 let fixed_size_list_type = DataType::FixedSizeList(
1012 Field::new("some_arbitrary_test_field", DataType::Int32, false).into(),
1013 13,
1014 );
1015 let array_type = DataType::List(
1016 Field::new_list_field(fixed_size_list_type.clone(), true).into(),
1017 );
1018 let index_type = DataType::Int64;
1019
1020 let schema = DFSchema::from_unqualified_fields(
1021 vec![
1022 Field::new("my_array", array_type.clone(), false),
1023 Field::new("my_index", index_type.clone(), false),
1024 ]
1025 .into(),
1026 HashMap::default(),
1027 )
1028 .unwrap();
1029
1030 let udf = array_element_udf();
1031
1032 assert_eq!(
1034 udf.return_type(&[array_type.clone(), index_type.clone()])
1035 .unwrap(),
1036 fixed_size_list_type
1037 );
1038
1039 let udf_expr = Expr::ScalarFunction(ScalarFunction {
1041 func: array_element_udf(),
1042 args: vec![
1043 Expr::Column(Column::new_unqualified("my_array")),
1044 Expr::Column(Column::new_unqualified("my_index")),
1045 ],
1046 });
1047 assert_eq!(
1048 ExprSchemable::get_type(&udf_expr, &schema).unwrap(),
1049 fixed_size_list_type
1050 );
1051 }
1052}