datafusion_functions_nested/
flatten.rs1use crate::utils::make_scalar_function;
21use arrow::array::{Array, ArrayRef, GenericListArray, OffsetSizeTrait};
22use arrow::buffer::OffsetBuffer;
23use arrow::datatypes::{
24 DataType,
25 DataType::{FixedSizeList, LargeList, List, Null},
26};
27use datafusion_common::cast::{as_large_list_array, as_list_array};
28use datafusion_common::{Result, exec_err, utils::take_function_args};
29use datafusion_expr::{
30 ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature,
31 Volatility,
32};
33use datafusion_macros::user_doc;
34use std::sync::Arc;
35
36make_udf_expr_and_func!(
37 Flatten,
38 flatten,
39 array,
40 "flattens an array of arrays into a single array.",
41 flatten_udf
42);
43
44#[user_doc(
45 doc_section(label = "Array Functions"),
46 description = "Converts an array of arrays to a flat array.\n\n- Applies to any depth of nested arrays\n- Does not change arrays that are already flat\n\nThe flattened array contains all the elements from all source arrays.",
47 syntax_example = "flatten(array)",
48 sql_example = r#"```sql
49> select flatten([[1, 2], [3, 4]]);
50+------------------------------+
51| flatten(List([1,2], [3,4])) |
52+------------------------------+
53| [1, 2, 3, 4] |
54+------------------------------+
55```"#,
56 argument(
57 name = "array",
58 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
59 )
60)]
61#[derive(Debug, PartialEq, Eq, Hash)]
62pub struct Flatten {
63 signature: Signature,
64 aliases: Vec<String>,
65}
66
67impl Default for Flatten {
68 fn default() -> Self {
69 Self::new()
70 }
71}
72
73impl Flatten {
74 pub fn new() -> Self {
75 Self {
76 signature: Signature::array(Volatility::Immutable),
77 aliases: vec![],
78 }
79 }
80}
81
82impl ScalarUDFImpl for Flatten {
83 fn name(&self) -> &str {
84 "flatten"
85 }
86
87 fn signature(&self) -> &Signature {
88 &self.signature
89 }
90
91 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
92 let data_type = match &arg_types[0] {
93 List(field) => match field.data_type() {
94 List(field) | FixedSizeList(field, _) => List(Arc::clone(field)),
95 LargeList(field) => LargeList(Arc::clone(field)),
96 _ => arg_types[0].clone(),
97 },
98 LargeList(field) => match field.data_type() {
99 List(field) | LargeList(field) | FixedSizeList(field, _) => {
100 LargeList(Arc::clone(field))
101 }
102 _ => arg_types[0].clone(),
103 },
104 Null => Null,
105 _ => exec_err!(
106 "Not reachable, data_type should be List, LargeList or FixedSizeList"
107 )?,
108 };
109
110 Ok(data_type)
111 }
112
113 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
114 make_scalar_function(flatten_inner)(&args.args)
115 }
116
117 fn aliases(&self) -> &[String] {
118 &self.aliases
119 }
120
121 fn documentation(&self) -> Option<&Documentation> {
122 self.doc()
123 }
124}
125
126fn flatten_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
127 let [array] = take_function_args("flatten", args)?;
128
129 match array.data_type() {
130 List(_) => {
131 let (_field, offsets, values, nulls) =
132 as_list_array(&array)?.clone().into_parts();
133 let values = cast_fsl_to_list(values)?;
134
135 match values.data_type() {
136 List(_) => {
137 let (inner_field, inner_offsets, inner_values, _) =
138 as_list_array(&values)?.clone().into_parts();
139 let offsets =
140 get_offsets_for_flatten::<i32, i32>(inner_offsets, &offsets);
141 let flattened_array = GenericListArray::<i32>::new(
142 inner_field,
143 offsets,
144 inner_values,
145 nulls,
146 );
147
148 Ok(Arc::new(flattened_array) as ArrayRef)
149 }
150 LargeList(_) => {
151 let (inner_field, inner_offsets, inner_values, _) =
152 as_large_list_array(&values)?.clone().into_parts();
153 let offsets =
154 get_offsets_for_flatten::<i64, i32>(inner_offsets, &offsets);
155 let flattened_array = GenericListArray::<i64>::new(
156 inner_field,
157 offsets,
158 inner_values,
159 nulls,
160 );
161 Ok(Arc::new(flattened_array) as ArrayRef)
162 }
163 _ => Ok(Arc::clone(array) as ArrayRef),
164 }
165 }
166 LargeList(_) => {
167 let (_field, offsets, values, nulls) =
168 as_large_list_array(&array)?.clone().into_parts();
169 let values = cast_fsl_to_list(values)?;
170
171 match values.data_type() {
172 List(_) => {
173 let (inner_field, inner_offsets, inner_values, _) =
174 as_list_array(&values)?.clone().into_parts();
175 let offsets = get_large_offsets_for_flatten(inner_offsets, &offsets);
176 let flattened_array = GenericListArray::<i64>::new(
177 inner_field,
178 offsets,
179 inner_values,
180 nulls,
181 );
182
183 Ok(Arc::new(flattened_array) as ArrayRef)
184 }
185 LargeList(_) => {
186 let (inner_field, inner_offsets, inner_values, _) =
187 as_large_list_array(&values)?.clone().into_parts();
188 let offsets =
189 get_offsets_for_flatten::<i64, i64>(inner_offsets, &offsets);
190 let flattened_array = GenericListArray::<i64>::new(
191 inner_field,
192 offsets,
193 inner_values,
194 nulls,
195 );
196
197 Ok(Arc::new(flattened_array) as ArrayRef)
198 }
199 _ => Ok(Arc::clone(array) as ArrayRef),
200 }
201 }
202 Null => Ok(Arc::clone(array)),
203 _ => {
204 exec_err!("flatten does not support type '{}'", array.data_type())
205 }
206 }
207}
208
209fn get_offsets_for_flatten<O: OffsetSizeTrait, P: OffsetSizeTrait>(
211 inner_offsets: OffsetBuffer<O>,
212 outer_offsets: &OffsetBuffer<P>,
213) -> OffsetBuffer<O> {
214 let buffer = inner_offsets.into_inner();
215 let offsets: Vec<O> = outer_offsets
216 .iter()
217 .map(|i| buffer[i.to_usize().unwrap()])
218 .collect();
219 OffsetBuffer::new(offsets.into())
220}
221
222fn get_large_offsets_for_flatten<O: OffsetSizeTrait, P: OffsetSizeTrait>(
224 inner_offsets: OffsetBuffer<O>,
225 outer_offsets: &OffsetBuffer<P>,
226) -> OffsetBuffer<i64> {
227 let buffer = inner_offsets.into_inner();
228 let offsets: Vec<i64> = outer_offsets
229 .iter()
230 .map(|i| buffer[i.to_usize().unwrap()].to_i64().unwrap())
231 .collect();
232 OffsetBuffer::new(offsets.into())
233}
234
235fn cast_fsl_to_list(array: ArrayRef) -> Result<ArrayRef> {
236 match array.data_type() {
237 FixedSizeList(field, _) => {
238 Ok(arrow::compute::cast(&array, &List(Arc::clone(field)))?)
239 }
240 _ => Ok(array),
241 }
242}