datafusion_functions_nested/
min_max.rs1use crate::utils::make_scalar_function;
20use arrow::array::{ArrayRef, GenericListArray, OffsetSizeTrait};
21use arrow::datatypes::DataType;
22use arrow::datatypes::DataType::{LargeList, List};
23use datafusion_common::cast::{as_large_list_array, as_list_array};
24use datafusion_common::utils::take_function_args;
25use datafusion_common::Result;
26use datafusion_common::{exec_err, plan_err, ScalarValue};
27use datafusion_doc::Documentation;
28use datafusion_expr::{
29 ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
30};
31use datafusion_functions_aggregate_common::min_max::{max_batch, min_batch};
32use datafusion_macros::user_doc;
33use itertools::Itertools;
34use std::any::Any;
35
36make_udf_expr_and_func!(
37 ArrayMax,
38 array_max,
39 array,
40 "returns the maximum value in the array.",
41 array_max_udf
42);
43
44#[user_doc(
45 doc_section(label = "Array Functions"),
46 description = "Returns the maximum value in the array.",
47 syntax_example = "array_max(array)",
48 sql_example = r#"```sql
49> select array_max([3,1,4,2]);
50+-----------------------------------------+
51| array_max(List([3,1,4,2])) |
52+-----------------------------------------+
53| 4 |
54+-----------------------------------------+
55```"#,
56 argument(
57 name = "array",
58 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
59 )
60)]
61#[derive(Debug)]
62pub struct ArrayMax {
63 signature: Signature,
64 aliases: Vec<String>,
65}
66
67impl Default for ArrayMax {
68 fn default() -> Self {
69 Self::new()
70 }
71}
72
73impl ArrayMax {
74 pub fn new() -> Self {
75 Self {
76 signature: Signature::array(Volatility::Immutable),
77 aliases: vec!["list_max".to_string()],
78 }
79 }
80}
81
82impl ScalarUDFImpl for ArrayMax {
83 fn as_any(&self) -> &dyn Any {
84 self
85 }
86
87 fn name(&self) -> &str {
88 "array_max"
89 }
90
91 fn signature(&self) -> &Signature {
92 &self.signature
93 }
94
95 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
96 let [array] = take_function_args(self.name(), arg_types)?;
97 match array {
98 List(field) | LargeList(field) => Ok(field.data_type().clone()),
99 arg_type => plan_err!("{} does not support type {arg_type}", self.name()),
100 }
101 }
102
103 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
104 make_scalar_function(array_max_inner)(&args.args)
105 }
106
107 fn aliases(&self) -> &[String] {
108 &self.aliases
109 }
110
111 fn documentation(&self) -> Option<&Documentation> {
112 self.doc()
113 }
114}
115
116pub fn array_max_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
124 let [array] = take_function_args("array_max", args)?;
125 match array.data_type() {
126 List(_) => array_min_max_helper(as_list_array(array)?, max_batch),
127 LargeList(_) => array_min_max_helper(as_large_list_array(array)?, max_batch),
128 arg_type => exec_err!("array_max does not support type: {arg_type}"),
129 }
130}
131
132make_udf_expr_and_func!(
133 ArrayMin,
134 array_min,
135 array,
136 "returns the minimum value in the array",
137 array_min_udf
138);
139#[user_doc(
140 doc_section(label = "Array Functions"),
141 description = "Returns the minimum value in the array.",
142 syntax_example = "array_min(array)",
143 sql_example = r#"```sql
144> select array_min([3,1,4,2]);
145+-----------------------------------------+
146| array_min(List([3,1,4,2])) |
147+-----------------------------------------+
148| 1 |
149+-----------------------------------------+
150```"#,
151 argument(
152 name = "array",
153 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
154 )
155)]
156#[derive(Debug)]
157struct ArrayMin {
158 signature: Signature,
159}
160
161impl Default for ArrayMin {
162 fn default() -> Self {
163 Self::new()
164 }
165}
166
167impl ArrayMin {
168 fn new() -> Self {
169 Self {
170 signature: Signature::array(Volatility::Immutable),
171 }
172 }
173}
174
175impl ScalarUDFImpl for ArrayMin {
176 fn as_any(&self) -> &dyn Any {
177 self
178 }
179
180 fn name(&self) -> &str {
181 "array_min"
182 }
183
184 fn signature(&self) -> &Signature {
185 &self.signature
186 }
187
188 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
189 let [array] = take_function_args(self.name(), arg_types)?;
190 match array {
191 List(field) | LargeList(field) => Ok(field.data_type().clone()),
192 arg_type => plan_err!("{} does not support type {}", self.name(), arg_type),
193 }
194 }
195
196 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
197 make_scalar_function(array_min_inner)(&args.args)
198 }
199
200 fn documentation(&self) -> Option<&Documentation> {
201 self.doc()
202 }
203}
204
205pub fn array_min_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
206 let [array] = take_function_args("array_min", args)?;
207 match array.data_type() {
208 List(_) => array_min_max_helper(as_list_array(array)?, min_batch),
209 LargeList(_) => array_min_max_helper(as_large_list_array(array)?, min_batch),
210 arg_type => exec_err!("array_min does not support type: {arg_type}"),
211 }
212}
213
214fn array_min_max_helper<O: OffsetSizeTrait>(
215 array: &GenericListArray<O>,
216 agg_fn: fn(&ArrayRef) -> Result<ScalarValue>,
217) -> Result<ArrayRef> {
218 let null_value = ScalarValue::try_from(array.value_type())?;
219 let result_vec: Vec<ScalarValue> = array
220 .iter()
221 .map(|arr| arr.as_ref().map_or_else(|| Ok(null_value.clone()), agg_fn))
222 .try_collect()?;
223 ScalarValue::iter_to_array(result_vec)
224}