Skip to main content

datafusion_functions_nested/
sort.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ScalarUDFImpl`] definitions for array_sort function.
19
20use crate::utils::make_scalar_function;
21use arrow::array::{Array, ArrayRef, GenericListArray, OffsetSizeTrait, new_null_array};
22use arrow::buffer::OffsetBuffer;
23use arrow::compute::SortColumn;
24use arrow::datatypes::{DataType, FieldRef};
25use arrow::{compute, compute::SortOptions};
26use datafusion_common::cast::{as_large_list_array, as_list_array, as_string_array};
27use datafusion_common::utils::ListCoercion;
28use datafusion_common::{Result, exec_err};
29use datafusion_expr::{
30    ArrayFunctionArgument, ArrayFunctionSignature, ColumnarValue, Documentation,
31    ScalarUDFImpl, Signature, TypeSignature, Volatility,
32};
33use datafusion_macros::user_doc;
34use std::any::Any;
35use std::sync::Arc;
36
37make_udf_expr_and_func!(
38    ArraySort,
39    array_sort,
40    array desc null_first,
41    "returns sorted array.",
42    array_sort_udf
43);
44
45/// Implementation of `array_sort` function
46///
47/// `array_sort` sorts the elements of an array
48///
49/// # Example
50///
51/// `array_sort([3, 1, 2])` returns `[1, 2, 3]`
52#[user_doc(
53    doc_section(label = "Array Functions"),
54    description = "Sort array.",
55    syntax_example = "array_sort(array, desc, nulls_first)",
56    sql_example = r#"```sql
57> select array_sort([3, 1, 2]);
58+-----------------------------+
59| array_sort(List([3,1,2]))   |
60+-----------------------------+
61| [1, 2, 3]                   |
62+-----------------------------+
63```"#,
64    argument(
65        name = "array",
66        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
67    ),
68    argument(
69        name = "desc",
70        description = "Whether to sort in descending order(`ASC` or `DESC`)."
71    ),
72    argument(
73        name = "nulls_first",
74        description = "Whether to sort nulls first(`NULLS FIRST` or `NULLS LAST`)."
75    )
76)]
77#[derive(Debug, PartialEq, Eq, Hash)]
78pub struct ArraySort {
79    signature: Signature,
80    aliases: Vec<String>,
81}
82
83impl Default for ArraySort {
84    fn default() -> Self {
85        Self::new()
86    }
87}
88
89impl ArraySort {
90    pub fn new() -> Self {
91        Self {
92            signature: Signature::one_of(
93                vec![
94                    TypeSignature::ArraySignature(ArrayFunctionSignature::Array {
95                        arguments: vec![ArrayFunctionArgument::Array],
96                        array_coercion: Some(ListCoercion::FixedSizedListToList),
97                    }),
98                    TypeSignature::ArraySignature(ArrayFunctionSignature::Array {
99                        arguments: vec![
100                            ArrayFunctionArgument::Array,
101                            ArrayFunctionArgument::String,
102                        ],
103                        array_coercion: Some(ListCoercion::FixedSizedListToList),
104                    }),
105                    TypeSignature::ArraySignature(ArrayFunctionSignature::Array {
106                        arguments: vec![
107                            ArrayFunctionArgument::Array,
108                            ArrayFunctionArgument::String,
109                            ArrayFunctionArgument::String,
110                        ],
111                        array_coercion: Some(ListCoercion::FixedSizedListToList),
112                    }),
113                ],
114                Volatility::Immutable,
115            ),
116            aliases: vec!["list_sort".to_string()],
117        }
118    }
119}
120
121impl ScalarUDFImpl for ArraySort {
122    fn as_any(&self) -> &dyn Any {
123        self
124    }
125
126    fn name(&self) -> &str {
127        "array_sort"
128    }
129
130    fn signature(&self) -> &Signature {
131        &self.signature
132    }
133
134    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
135        Ok(arg_types[0].clone())
136    }
137
138    fn invoke_with_args(
139        &self,
140        args: datafusion_expr::ScalarFunctionArgs,
141    ) -> Result<ColumnarValue> {
142        make_scalar_function(array_sort_inner)(&args.args)
143    }
144
145    fn aliases(&self) -> &[String] {
146        &self.aliases
147    }
148
149    fn documentation(&self) -> Option<&Documentation> {
150        self.doc()
151    }
152}
153
154fn array_sort_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
155    if args.is_empty() || args.len() > 3 {
156        return exec_err!("array_sort expects one to three arguments");
157    }
158
159    if args[0].is_empty() || args[0].data_type().is_null() {
160        return Ok(Arc::clone(&args[0]));
161    }
162
163    if args[1..].iter().any(|array| array.is_null(0)) {
164        return Ok(new_null_array(args[0].data_type(), args[0].len()));
165    }
166
167    let sort_options = match args.len() {
168        1 => None,
169        2 => {
170            let sort = as_string_array(&args[1])?.value(0);
171            Some(SortOptions {
172                descending: order_desc(sort)?,
173                nulls_first: true,
174            })
175        }
176        3 => {
177            let sort = as_string_array(&args[1])?.value(0);
178            let nulls_first = as_string_array(&args[2])?.value(0);
179            Some(SortOptions {
180                descending: order_desc(sort)?,
181                nulls_first: order_nulls_first(nulls_first)?,
182            })
183        }
184        // We guard at the top
185        _ => unreachable!(),
186    };
187
188    match args[0].data_type() {
189        DataType::List(field) | DataType::LargeList(field)
190            if field.data_type().is_null() =>
191        {
192            Ok(Arc::clone(&args[0]))
193        }
194        DataType::List(field) => {
195            let array = as_list_array(&args[0])?;
196            array_sort_generic(array, Arc::clone(field), sort_options)
197        }
198        DataType::LargeList(field) => {
199            let array = as_large_list_array(&args[0])?;
200            array_sort_generic(array, Arc::clone(field), sort_options)
201        }
202        // Signature should prevent this arm ever occurring
203        _ => exec_err!("array_sort expects list for first argument"),
204    }
205}
206
207fn array_sort_generic<OffsetSize: OffsetSizeTrait>(
208    list_array: &GenericListArray<OffsetSize>,
209    field: FieldRef,
210    sort_options: Option<SortOptions>,
211) -> Result<ArrayRef> {
212    let row_count = list_array.len();
213
214    let mut array_lengths = vec![];
215    let mut arrays = vec![];
216    for i in 0..row_count {
217        if list_array.is_null(i) {
218            array_lengths.push(0);
219        } else {
220            let arr_ref = list_array.value(i);
221
222            // arrow sort kernel does not support Structs, so use
223            // lexsort_to_indices instead:
224            // https://github.com/apache/arrow-rs/issues/6911#issuecomment-2562928843
225            let sorted_array = match arr_ref.data_type() {
226                DataType::Struct(_) => {
227                    let sort_columns: Vec<SortColumn> = vec![SortColumn {
228                        values: Arc::clone(&arr_ref),
229                        options: sort_options,
230                    }];
231                    let indices = compute::lexsort_to_indices(&sort_columns, None)?;
232                    compute::take(arr_ref.as_ref(), &indices, None)?
233                }
234                _ => {
235                    let arr_ref = arr_ref.as_ref();
236                    compute::sort(arr_ref, sort_options)?
237                }
238            };
239            array_lengths.push(sorted_array.len());
240            arrays.push(sorted_array);
241        }
242    }
243
244    let elements = arrays
245        .iter()
246        .map(|a| a.as_ref())
247        .collect::<Vec<&dyn Array>>();
248
249    let list_arr = if elements.is_empty() {
250        GenericListArray::<OffsetSize>::new_null(field, row_count)
251    } else {
252        GenericListArray::<OffsetSize>::new(
253            field,
254            OffsetBuffer::from_lengths(array_lengths),
255            Arc::new(compute::concat(elements.as_slice())?),
256            list_array.nulls().cloned(),
257        )
258    };
259    Ok(Arc::new(list_arr))
260}
261
262fn order_desc(modifier: &str) -> Result<bool> {
263    match modifier.to_uppercase().as_str() {
264        "DESC" => Ok(true),
265        "ASC" => Ok(false),
266        _ => exec_err!("the second parameter of array_sort expects DESC or ASC"),
267    }
268}
269
270fn order_nulls_first(modifier: &str) -> Result<bool> {
271    match modifier.to_uppercase().as_str() {
272        "NULLS FIRST" => Ok(true),
273        "NULLS LAST" => Ok(false),
274        _ => exec_err!(
275            "the third parameter of array_sort expects NULLS FIRST or NULLS LAST"
276        ),
277    }
278}