datafusion_functions_nested/
flatten.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ScalarUDFImpl`] definitions for flatten function.
19
20use crate::utils::make_scalar_function;
21use arrow::array::{Array, ArrayRef, GenericListArray, OffsetSizeTrait};
22use arrow::buffer::OffsetBuffer;
23use arrow::datatypes::{
24    DataType,
25    DataType::{FixedSizeList, LargeList, List, Null},
26};
27use datafusion_common::cast::{as_large_list_array, as_list_array};
28use datafusion_common::{Result, exec_err, utils::take_function_args};
29use datafusion_expr::{
30    ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
31};
32use datafusion_macros::user_doc;
33use std::any::Any;
34use std::sync::Arc;
35
36make_udf_expr_and_func!(
37    Flatten,
38    flatten,
39    array,
40    "flattens an array of arrays into a single array.",
41    flatten_udf
42);
43
44#[user_doc(
45    doc_section(label = "Array Functions"),
46    description = "Converts an array of arrays to a flat array.\n\n- Applies to any depth of nested arrays\n- Does not change arrays that are already flat\n\nThe flattened array contains all the elements from all source arrays.",
47    syntax_example = "flatten(array)",
48    sql_example = r#"```sql
49> select flatten([[1, 2], [3, 4]]);
50+------------------------------+
51| flatten(List([1,2], [3,4]))  |
52+------------------------------+
53| [1, 2, 3, 4]                 |
54+------------------------------+
55```"#,
56    argument(
57        name = "array",
58        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
59    )
60)]
61#[derive(Debug, PartialEq, Eq, Hash)]
62pub struct Flatten {
63    signature: Signature,
64    aliases: Vec<String>,
65}
66
67impl Default for Flatten {
68    fn default() -> Self {
69        Self::new()
70    }
71}
72
73impl Flatten {
74    pub fn new() -> Self {
75        Self {
76            signature: Signature::array(Volatility::Immutable),
77            aliases: vec![],
78        }
79    }
80}
81
82impl ScalarUDFImpl for Flatten {
83    fn as_any(&self) -> &dyn Any {
84        self
85    }
86
87    fn name(&self) -> &str {
88        "flatten"
89    }
90
91    fn signature(&self) -> &Signature {
92        &self.signature
93    }
94
95    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
96        let data_type = match &arg_types[0] {
97            List(field) => match field.data_type() {
98                List(field) | FixedSizeList(field, _) => List(Arc::clone(field)),
99                LargeList(field) => LargeList(Arc::clone(field)),
100                _ => arg_types[0].clone(),
101            },
102            LargeList(field) => match field.data_type() {
103                List(field) | LargeList(field) | FixedSizeList(field, _) => {
104                    LargeList(Arc::clone(field))
105                }
106                _ => arg_types[0].clone(),
107            },
108            Null => Null,
109            _ => exec_err!(
110                "Not reachable, data_type should be List, LargeList or FixedSizeList"
111            )?,
112        };
113
114        Ok(data_type)
115    }
116
117    fn invoke_with_args(
118        &self,
119        args: datafusion_expr::ScalarFunctionArgs,
120    ) -> Result<ColumnarValue> {
121        make_scalar_function(flatten_inner)(&args.args)
122    }
123
124    fn aliases(&self) -> &[String] {
125        &self.aliases
126    }
127
128    fn documentation(&self) -> Option<&Documentation> {
129        self.doc()
130    }
131}
132
133fn flatten_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
134    let [array] = take_function_args("flatten", args)?;
135
136    match array.data_type() {
137        List(_) => {
138            let (_field, offsets, values, nulls) =
139                as_list_array(&array)?.clone().into_parts();
140            let values = cast_fsl_to_list(values)?;
141
142            match values.data_type() {
143                List(_) => {
144                    let (inner_field, inner_offsets, inner_values, _) =
145                        as_list_array(&values)?.clone().into_parts();
146                    let offsets =
147                        get_offsets_for_flatten::<i32, i32>(inner_offsets, &offsets);
148                    let flattened_array = GenericListArray::<i32>::new(
149                        inner_field,
150                        offsets,
151                        inner_values,
152                        nulls,
153                    );
154
155                    Ok(Arc::new(flattened_array) as ArrayRef)
156                }
157                LargeList(_) => {
158                    let (inner_field, inner_offsets, inner_values, _) =
159                        as_large_list_array(&values)?.clone().into_parts();
160                    let offsets =
161                        get_offsets_for_flatten::<i64, i32>(inner_offsets, &offsets);
162                    let flattened_array = GenericListArray::<i64>::new(
163                        inner_field,
164                        offsets,
165                        inner_values,
166                        nulls,
167                    );
168                    Ok(Arc::new(flattened_array) as ArrayRef)
169                }
170                _ => Ok(Arc::clone(array) as ArrayRef),
171            }
172        }
173        LargeList(_) => {
174            let (_field, offsets, values, nulls) =
175                as_large_list_array(&array)?.clone().into_parts();
176            let values = cast_fsl_to_list(values)?;
177
178            match values.data_type() {
179                List(_) => {
180                    let (inner_field, inner_offsets, inner_values, _) =
181                        as_list_array(&values)?.clone().into_parts();
182                    let offsets = get_large_offsets_for_flatten(inner_offsets, &offsets);
183                    let flattened_array = GenericListArray::<i64>::new(
184                        inner_field,
185                        offsets,
186                        inner_values,
187                        nulls,
188                    );
189
190                    Ok(Arc::new(flattened_array) as ArrayRef)
191                }
192                LargeList(_) => {
193                    let (inner_field, inner_offsets, inner_values, _) =
194                        as_large_list_array(&values)?.clone().into_parts();
195                    let offsets =
196                        get_offsets_for_flatten::<i64, i64>(inner_offsets, &offsets);
197                    let flattened_array = GenericListArray::<i64>::new(
198                        inner_field,
199                        offsets,
200                        inner_values,
201                        nulls,
202                    );
203
204                    Ok(Arc::new(flattened_array) as ArrayRef)
205                }
206                _ => Ok(Arc::clone(array) as ArrayRef),
207            }
208        }
209        Null => Ok(Arc::clone(array)),
210        _ => {
211            exec_err!("flatten does not support type '{:?}'", array.data_type())
212        }
213    }
214}
215
216// Create new offsets that are equivalent to `flatten` the array.
217fn get_offsets_for_flatten<O: OffsetSizeTrait, P: OffsetSizeTrait>(
218    inner_offsets: OffsetBuffer<O>,
219    outer_offsets: &OffsetBuffer<P>,
220) -> OffsetBuffer<O> {
221    let buffer = inner_offsets.into_inner();
222    let offsets: Vec<O> = outer_offsets
223        .iter()
224        .map(|i| buffer[i.to_usize().unwrap()])
225        .collect();
226    OffsetBuffer::new(offsets.into())
227}
228
229// Create new large offsets that are equivalent to `flatten` the array.
230fn get_large_offsets_for_flatten<O: OffsetSizeTrait, P: OffsetSizeTrait>(
231    inner_offsets: OffsetBuffer<O>,
232    outer_offsets: &OffsetBuffer<P>,
233) -> OffsetBuffer<i64> {
234    let buffer = inner_offsets.into_inner();
235    let offsets: Vec<i64> = outer_offsets
236        .iter()
237        .map(|i| buffer[i.to_usize().unwrap()].to_i64().unwrap())
238        .collect();
239    OffsetBuffer::new(offsets.into())
240}
241
242fn cast_fsl_to_list(array: ArrayRef) -> Result<ArrayRef> {
243    match array.data_type() {
244        FixedSizeList(field, _) => {
245            Ok(arrow::compute::cast(&array, &List(Arc::clone(field)))?)
246        }
247        _ => Ok(array),
248    }
249}