datafusion_functions_nested/
flatten.rs1use crate::utils::make_scalar_function;
21use arrow::array::{Array, ArrayRef, GenericListArray, OffsetSizeTrait};
22use arrow::buffer::OffsetBuffer;
23use arrow::datatypes::{
24 DataType,
25 DataType::{FixedSizeList, LargeList, List, Null},
26};
27use datafusion_common::cast::{as_large_list_array, as_list_array};
28use datafusion_common::{exec_err, utils::take_function_args, Result};
29use datafusion_expr::{
30 ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
31};
32use datafusion_macros::user_doc;
33use std::any::Any;
34use std::sync::Arc;
35
36make_udf_expr_and_func!(
37 Flatten,
38 flatten,
39 array,
40 "flattens an array of arrays into a single array.",
41 flatten_udf
42);
43
44#[user_doc(
45 doc_section(label = "Array Functions"),
46 description = "Converts an array of arrays to a flat array.\n\n- Applies to any depth of nested arrays\n- Does not change arrays that are already flat\n\nThe flattened array contains all the elements from all source arrays.",
47 syntax_example = "flatten(array)",
48 sql_example = r#"```sql
49> select flatten([[1, 2], [3, 4]]);
50+------------------------------+
51| flatten(List([1,2], [3,4])) |
52+------------------------------+
53| [1, 2, 3, 4] |
54+------------------------------+
55```"#,
56 argument(
57 name = "array",
58 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
59 )
60)]
61#[derive(Debug, PartialEq, Eq, Hash)]
62pub struct Flatten {
63 signature: Signature,
64 aliases: Vec<String>,
65}
66
67impl Default for Flatten {
68 fn default() -> Self {
69 Self::new()
70 }
71}
72
73impl Flatten {
74 pub fn new() -> Self {
75 Self {
76 signature: Signature::array(Volatility::Immutable),
77 aliases: vec![],
78 }
79 }
80}
81
82impl ScalarUDFImpl for Flatten {
83 fn as_any(&self) -> &dyn Any {
84 self
85 }
86
87 fn name(&self) -> &str {
88 "flatten"
89 }
90
91 fn signature(&self) -> &Signature {
92 &self.signature
93 }
94
95 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
96 let data_type = match &arg_types[0] {
97 List(field) => match field.data_type() {
98 List(field) | FixedSizeList(field, _) => List(Arc::clone(field)),
99 LargeList(field) => LargeList(Arc::clone(field)),
100 _ => arg_types[0].clone(),
101 },
102 LargeList(field) => match field.data_type() {
103 List(field) | LargeList(field) | FixedSizeList(field, _) => {
104 LargeList(Arc::clone(field))
105 }
106 _ => arg_types[0].clone(),
107 },
108 Null => Null,
109 _ => exec_err!(
110 "Not reachable, data_type should be List, LargeList or FixedSizeList"
111 )?,
112 };
113
114 Ok(data_type)
115 }
116
117 fn invoke_with_args(
118 &self,
119 args: datafusion_expr::ScalarFunctionArgs,
120 ) -> Result<ColumnarValue> {
121 make_scalar_function(flatten_inner)(&args.args)
122 }
123
124 fn aliases(&self) -> &[String] {
125 &self.aliases
126 }
127
128 fn documentation(&self) -> Option<&Documentation> {
129 self.doc()
130 }
131}
132
133pub fn flatten_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
135 let [array] = take_function_args("flatten", args)?;
136
137 match array.data_type() {
138 List(_) => {
139 let (_field, offsets, values, nulls) =
140 as_list_array(&array)?.clone().into_parts();
141 let values = cast_fsl_to_list(values)?;
142
143 match values.data_type() {
144 List(_) => {
145 let (inner_field, inner_offsets, inner_values, _) =
146 as_list_array(&values)?.clone().into_parts();
147 let offsets =
148 get_offsets_for_flatten::<i32, i32>(inner_offsets, offsets);
149 let flattened_array = GenericListArray::<i32>::new(
150 inner_field,
151 offsets,
152 inner_values,
153 nulls,
154 );
155
156 Ok(Arc::new(flattened_array) as ArrayRef)
157 }
158 LargeList(_) => {
159 let (inner_field, inner_offsets, inner_values, _) =
160 as_large_list_array(&values)?.clone().into_parts();
161 let offsets =
162 get_offsets_for_flatten::<i64, i32>(inner_offsets, offsets);
163 let flattened_array = GenericListArray::<i64>::new(
164 inner_field,
165 offsets,
166 inner_values,
167 nulls,
168 );
169 Ok(Arc::new(flattened_array) as ArrayRef)
170 }
171 _ => Ok(Arc::clone(array) as ArrayRef),
172 }
173 }
174 LargeList(_) => {
175 let (_field, offsets, values, nulls) =
176 as_large_list_array(&array)?.clone().into_parts();
177 let values = cast_fsl_to_list(values)?;
178
179 match values.data_type() {
180 List(_) => {
181 let (inner_field, inner_offsets, inner_values, _) =
182 as_list_array(&values)?.clone().into_parts();
183 let offsets = get_large_offsets_for_flatten(inner_offsets, offsets);
184 let flattened_array = GenericListArray::<i64>::new(
185 inner_field,
186 offsets,
187 inner_values,
188 nulls,
189 );
190
191 Ok(Arc::new(flattened_array) as ArrayRef)
192 }
193 LargeList(_) => {
194 let (inner_field, inner_offsets, inner_values, _) =
195 as_large_list_array(&values)?.clone().into_parts();
196 let offsets =
197 get_offsets_for_flatten::<i64, i64>(inner_offsets, offsets);
198 let flattened_array = GenericListArray::<i64>::new(
199 inner_field,
200 offsets,
201 inner_values,
202 nulls,
203 );
204
205 Ok(Arc::new(flattened_array) as ArrayRef)
206 }
207 _ => Ok(Arc::clone(array) as ArrayRef),
208 }
209 }
210 Null => Ok(Arc::clone(array)),
211 _ => {
212 exec_err!("flatten does not support type '{:?}'", array.data_type())
213 }
214 }
215}
216
217fn get_offsets_for_flatten<O: OffsetSizeTrait, P: OffsetSizeTrait>(
219 inner_offsets: OffsetBuffer<O>,
220 outer_offsets: OffsetBuffer<P>,
221) -> OffsetBuffer<O> {
222 let buffer = inner_offsets.into_inner();
223 let offsets: Vec<O> = outer_offsets
224 .iter()
225 .map(|i| buffer[i.to_usize().unwrap()])
226 .collect();
227 OffsetBuffer::new(offsets.into())
228}
229
230fn get_large_offsets_for_flatten<O: OffsetSizeTrait, P: OffsetSizeTrait>(
232 inner_offsets: OffsetBuffer<O>,
233 outer_offsets: OffsetBuffer<P>,
234) -> OffsetBuffer<i64> {
235 let buffer = inner_offsets.into_inner();
236 let offsets: Vec<i64> = outer_offsets
237 .iter()
238 .map(|i| buffer[i.to_usize().unwrap()].to_i64().unwrap())
239 .collect();
240 OffsetBuffer::new(offsets.into())
241}
242
243fn cast_fsl_to_list(array: ArrayRef) -> Result<ArrayRef> {
244 match array.data_type() {
245 FixedSizeList(field, _) => {
246 Ok(arrow::compute::cast(&array, &List(Arc::clone(field)))?)
247 }
248 _ => Ok(array),
249 }
250}