Skip to main content

datafusion_functions_nested/
flatten.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ScalarUDFImpl`] definitions for flatten function.
19
20use crate::utils::make_scalar_function;
21use arrow::array::{Array, ArrayRef, GenericListArray, OffsetSizeTrait};
22use arrow::buffer::OffsetBuffer;
23use arrow::datatypes::{
24    DataType,
25    DataType::{FixedSizeList, LargeList, List, Null},
26};
27use datafusion_common::cast::{as_large_list_array, as_list_array};
28use datafusion_common::{Result, exec_err, utils::take_function_args};
29use datafusion_expr::{
30    ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature,
31    Volatility,
32};
33use datafusion_macros::user_doc;
34use std::sync::Arc;
35
36make_udf_expr_and_func!(
37    Flatten,
38    flatten,
39    array,
40    "flattens an array of arrays into a single array.",
41    flatten_udf
42);
43
44#[user_doc(
45    doc_section(label = "Array Functions"),
46    description = "Converts an array of arrays to a flat array.\n\n- Applies to any depth of nested arrays\n- Does not change arrays that are already flat\n\nThe flattened array contains all the elements from all source arrays.",
47    syntax_example = "flatten(array)",
48    sql_example = r#"```sql
49> select flatten([[1, 2], [3, 4]]);
50+------------------------------+
51| flatten(List([1,2], [3,4]))  |
52+------------------------------+
53| [1, 2, 3, 4]                 |
54+------------------------------+
55```"#,
56    argument(
57        name = "array",
58        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
59    )
60)]
61#[derive(Debug, PartialEq, Eq, Hash)]
62pub struct Flatten {
63    signature: Signature,
64    aliases: Vec<String>,
65}
66
67impl Default for Flatten {
68    fn default() -> Self {
69        Self::new()
70    }
71}
72
73impl Flatten {
74    pub fn new() -> Self {
75        Self {
76            signature: Signature::array(Volatility::Immutable),
77            aliases: vec![],
78        }
79    }
80}
81
82impl ScalarUDFImpl for Flatten {
83    fn name(&self) -> &str {
84        "flatten"
85    }
86
87    fn signature(&self) -> &Signature {
88        &self.signature
89    }
90
91    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
92        let data_type = match &arg_types[0] {
93            List(field) => match field.data_type() {
94                List(field) | FixedSizeList(field, _) => List(Arc::clone(field)),
95                LargeList(field) => LargeList(Arc::clone(field)),
96                _ => arg_types[0].clone(),
97            },
98            LargeList(field) => match field.data_type() {
99                List(field) | LargeList(field) | FixedSizeList(field, _) => {
100                    LargeList(Arc::clone(field))
101                }
102                _ => arg_types[0].clone(),
103            },
104            Null => Null,
105            _ => exec_err!(
106                "Not reachable, data_type should be List, LargeList or FixedSizeList"
107            )?,
108        };
109
110        Ok(data_type)
111    }
112
113    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
114        make_scalar_function(flatten_inner)(&args.args)
115    }
116
117    fn aliases(&self) -> &[String] {
118        &self.aliases
119    }
120
121    fn documentation(&self) -> Option<&Documentation> {
122        self.doc()
123    }
124}
125
126fn flatten_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
127    let [array] = take_function_args("flatten", args)?;
128
129    match array.data_type() {
130        List(_) => {
131            let (_field, offsets, values, nulls) =
132                as_list_array(&array)?.clone().into_parts();
133            let values = cast_fsl_to_list(values)?;
134
135            match values.data_type() {
136                List(_) => {
137                    let (inner_field, inner_offsets, inner_values, _) =
138                        as_list_array(&values)?.clone().into_parts();
139                    let offsets =
140                        get_offsets_for_flatten::<i32, i32>(inner_offsets, &offsets);
141                    let flattened_array = GenericListArray::<i32>::new(
142                        inner_field,
143                        offsets,
144                        inner_values,
145                        nulls,
146                    );
147
148                    Ok(Arc::new(flattened_array) as ArrayRef)
149                }
150                LargeList(_) => {
151                    let (inner_field, inner_offsets, inner_values, _) =
152                        as_large_list_array(&values)?.clone().into_parts();
153                    let offsets =
154                        get_offsets_for_flatten::<i64, i32>(inner_offsets, &offsets);
155                    let flattened_array = GenericListArray::<i64>::new(
156                        inner_field,
157                        offsets,
158                        inner_values,
159                        nulls,
160                    );
161                    Ok(Arc::new(flattened_array) as ArrayRef)
162                }
163                _ => Ok(Arc::clone(array) as ArrayRef),
164            }
165        }
166        LargeList(_) => {
167            let (_field, offsets, values, nulls) =
168                as_large_list_array(&array)?.clone().into_parts();
169            let values = cast_fsl_to_list(values)?;
170
171            match values.data_type() {
172                List(_) => {
173                    let (inner_field, inner_offsets, inner_values, _) =
174                        as_list_array(&values)?.clone().into_parts();
175                    let offsets = get_large_offsets_for_flatten(inner_offsets, &offsets);
176                    let flattened_array = GenericListArray::<i64>::new(
177                        inner_field,
178                        offsets,
179                        inner_values,
180                        nulls,
181                    );
182
183                    Ok(Arc::new(flattened_array) as ArrayRef)
184                }
185                LargeList(_) => {
186                    let (inner_field, inner_offsets, inner_values, _) =
187                        as_large_list_array(&values)?.clone().into_parts();
188                    let offsets =
189                        get_offsets_for_flatten::<i64, i64>(inner_offsets, &offsets);
190                    let flattened_array = GenericListArray::<i64>::new(
191                        inner_field,
192                        offsets,
193                        inner_values,
194                        nulls,
195                    );
196
197                    Ok(Arc::new(flattened_array) as ArrayRef)
198                }
199                _ => Ok(Arc::clone(array) as ArrayRef),
200            }
201        }
202        Null => Ok(Arc::clone(array)),
203        _ => {
204            exec_err!("flatten does not support type '{}'", array.data_type())
205        }
206    }
207}
208
209// Create new offsets that are equivalent to `flatten` the array.
210fn get_offsets_for_flatten<O: OffsetSizeTrait, P: OffsetSizeTrait>(
211    inner_offsets: OffsetBuffer<O>,
212    outer_offsets: &OffsetBuffer<P>,
213) -> OffsetBuffer<O> {
214    let buffer = inner_offsets.into_inner();
215    let offsets: Vec<O> = outer_offsets
216        .iter()
217        .map(|i| buffer[i.to_usize().unwrap()])
218        .collect();
219    OffsetBuffer::new(offsets.into())
220}
221
222// Create new large offsets that are equivalent to `flatten` the array.
223fn get_large_offsets_for_flatten<O: OffsetSizeTrait, P: OffsetSizeTrait>(
224    inner_offsets: OffsetBuffer<O>,
225    outer_offsets: &OffsetBuffer<P>,
226) -> OffsetBuffer<i64> {
227    let buffer = inner_offsets.into_inner();
228    let offsets: Vec<i64> = outer_offsets
229        .iter()
230        .map(|i| buffer[i.to_usize().unwrap()].to_i64().unwrap())
231        .collect();
232    OffsetBuffer::new(offsets.into())
233}
234
235fn cast_fsl_to_list(array: ArrayRef) -> Result<ArrayRef> {
236    match array.data_type() {
237        FixedSizeList(field, _) => {
238            Ok(arrow::compute::cast(&array, &List(Arc::clone(field)))?)
239        }
240        _ => Ok(array),
241    }
242}