datafusion_functions_nested/
flatten.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ScalarUDFImpl`] definitions for flatten function.
19
20use crate::utils::make_scalar_function;
21use arrow::array::{Array, ArrayRef, GenericListArray, OffsetSizeTrait};
22use arrow::buffer::OffsetBuffer;
23use arrow::datatypes::{
24    DataType,
25    DataType::{FixedSizeList, LargeList, List, Null},
26};
27use datafusion_common::cast::{as_large_list_array, as_list_array};
28use datafusion_common::{exec_err, utils::take_function_args, Result};
29use datafusion_expr::{
30    ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
31};
32use datafusion_macros::user_doc;
33use std::any::Any;
34use std::sync::Arc;
35
36make_udf_expr_and_func!(
37    Flatten,
38    flatten,
39    array,
40    "flattens an array of arrays into a single array.",
41    flatten_udf
42);
43
44#[user_doc(
45    doc_section(label = "Array Functions"),
46    description = "Converts an array of arrays to a flat array.\n\n- Applies to any depth of nested arrays\n- Does not change arrays that are already flat\n\nThe flattened array contains all the elements from all source arrays.",
47    syntax_example = "flatten(array)",
48    sql_example = r#"```sql
49> select flatten([[1, 2], [3, 4]]);
50+------------------------------+
51| flatten(List([1,2], [3,4]))  |
52+------------------------------+
53| [1, 2, 3, 4]                 |
54+------------------------------+
55```"#,
56    argument(
57        name = "array",
58        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
59    )
60)]
61#[derive(Debug, PartialEq, Eq, Hash)]
62pub struct Flatten {
63    signature: Signature,
64    aliases: Vec<String>,
65}
66
67impl Default for Flatten {
68    fn default() -> Self {
69        Self::new()
70    }
71}
72
73impl Flatten {
74    pub fn new() -> Self {
75        Self {
76            signature: Signature::array(Volatility::Immutable),
77            aliases: vec![],
78        }
79    }
80}
81
82impl ScalarUDFImpl for Flatten {
83    fn as_any(&self) -> &dyn Any {
84        self
85    }
86
87    fn name(&self) -> &str {
88        "flatten"
89    }
90
91    fn signature(&self) -> &Signature {
92        &self.signature
93    }
94
95    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
96        let data_type = match &arg_types[0] {
97            List(field) => match field.data_type() {
98                List(field) | FixedSizeList(field, _) => List(Arc::clone(field)),
99                LargeList(field) => LargeList(Arc::clone(field)),
100                _ => arg_types[0].clone(),
101            },
102            LargeList(field) => match field.data_type() {
103                List(field) | LargeList(field) | FixedSizeList(field, _) => {
104                    LargeList(Arc::clone(field))
105                }
106                _ => arg_types[0].clone(),
107            },
108            Null => Null,
109            _ => exec_err!(
110                "Not reachable, data_type should be List, LargeList or FixedSizeList"
111            )?,
112        };
113
114        Ok(data_type)
115    }
116
117    fn invoke_with_args(
118        &self,
119        args: datafusion_expr::ScalarFunctionArgs,
120    ) -> Result<ColumnarValue> {
121        make_scalar_function(flatten_inner)(&args.args)
122    }
123
124    fn aliases(&self) -> &[String] {
125        &self.aliases
126    }
127
128    fn documentation(&self) -> Option<&Documentation> {
129        self.doc()
130    }
131}
132
133/// Flatten SQL function
134pub fn flatten_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
135    let [array] = take_function_args("flatten", args)?;
136
137    match array.data_type() {
138        List(_) => {
139            let (_field, offsets, values, nulls) =
140                as_list_array(&array)?.clone().into_parts();
141            let values = cast_fsl_to_list(values)?;
142
143            match values.data_type() {
144                List(_) => {
145                    let (inner_field, inner_offsets, inner_values, _) =
146                        as_list_array(&values)?.clone().into_parts();
147                    let offsets =
148                        get_offsets_for_flatten::<i32, i32>(inner_offsets, offsets);
149                    let flattened_array = GenericListArray::<i32>::new(
150                        inner_field,
151                        offsets,
152                        inner_values,
153                        nulls,
154                    );
155
156                    Ok(Arc::new(flattened_array) as ArrayRef)
157                }
158                LargeList(_) => {
159                    let (inner_field, inner_offsets, inner_values, _) =
160                        as_large_list_array(&values)?.clone().into_parts();
161                    let offsets =
162                        get_offsets_for_flatten::<i64, i32>(inner_offsets, offsets);
163                    let flattened_array = GenericListArray::<i64>::new(
164                        inner_field,
165                        offsets,
166                        inner_values,
167                        nulls,
168                    );
169                    Ok(Arc::new(flattened_array) as ArrayRef)
170                }
171                _ => Ok(Arc::clone(array) as ArrayRef),
172            }
173        }
174        LargeList(_) => {
175            let (_field, offsets, values, nulls) =
176                as_large_list_array(&array)?.clone().into_parts();
177            let values = cast_fsl_to_list(values)?;
178
179            match values.data_type() {
180                List(_) => {
181                    let (inner_field, inner_offsets, inner_values, _) =
182                        as_list_array(&values)?.clone().into_parts();
183                    let offsets = get_large_offsets_for_flatten(inner_offsets, offsets);
184                    let flattened_array = GenericListArray::<i64>::new(
185                        inner_field,
186                        offsets,
187                        inner_values,
188                        nulls,
189                    );
190
191                    Ok(Arc::new(flattened_array) as ArrayRef)
192                }
193                LargeList(_) => {
194                    let (inner_field, inner_offsets, inner_values, _) =
195                        as_large_list_array(&values)?.clone().into_parts();
196                    let offsets =
197                        get_offsets_for_flatten::<i64, i64>(inner_offsets, offsets);
198                    let flattened_array = GenericListArray::<i64>::new(
199                        inner_field,
200                        offsets,
201                        inner_values,
202                        nulls,
203                    );
204
205                    Ok(Arc::new(flattened_array) as ArrayRef)
206                }
207                _ => Ok(Arc::clone(array) as ArrayRef),
208            }
209        }
210        Null => Ok(Arc::clone(array)),
211        _ => {
212            exec_err!("flatten does not support type '{:?}'", array.data_type())
213        }
214    }
215}
216
217// Create new offsets that are equivalent to `flatten` the array.
218fn get_offsets_for_flatten<O: OffsetSizeTrait, P: OffsetSizeTrait>(
219    inner_offsets: OffsetBuffer<O>,
220    outer_offsets: OffsetBuffer<P>,
221) -> OffsetBuffer<O> {
222    let buffer = inner_offsets.into_inner();
223    let offsets: Vec<O> = outer_offsets
224        .iter()
225        .map(|i| buffer[i.to_usize().unwrap()])
226        .collect();
227    OffsetBuffer::new(offsets.into())
228}
229
230// Create new large offsets that are equivalent to `flatten` the array.
231fn get_large_offsets_for_flatten<O: OffsetSizeTrait, P: OffsetSizeTrait>(
232    inner_offsets: OffsetBuffer<O>,
233    outer_offsets: OffsetBuffer<P>,
234) -> OffsetBuffer<i64> {
235    let buffer = inner_offsets.into_inner();
236    let offsets: Vec<i64> = outer_offsets
237        .iter()
238        .map(|i| buffer[i.to_usize().unwrap()].to_i64().unwrap())
239        .collect();
240    OffsetBuffer::new(offsets.into())
241}
242
243fn cast_fsl_to_list(array: ArrayRef) -> Result<ArrayRef> {
244    match array.data_type() {
245        FixedSizeList(field, _) => {
246            Ok(arrow::compute::cast(&array, &List(Arc::clone(field)))?)
247        }
248        _ => Ok(array),
249    }
250}