Skip to main content

datafusion_functions_nested/
array_compact.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ScalarUDFImpl`] definitions for array_compact function.
19
20use crate::utils::make_scalar_function;
21use arrow::array::{
22    Array, ArrayRef, Capacities, GenericListArray, MutableArrayData, OffsetSizeTrait,
23    make_array,
24};
25use arrow::buffer::OffsetBuffer;
26use arrow::datatypes::DataType;
27use arrow::datatypes::DataType::{LargeList, List, Null};
28use datafusion_common::cast::{as_large_list_array, as_list_array};
29use datafusion_common::{Result, exec_err, utils::take_function_args};
30use datafusion_expr::{
31    ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature,
32    Volatility,
33};
34use datafusion_macros::user_doc;
35use std::sync::Arc;
36
37make_udf_expr_and_func!(
38    ArrayCompact,
39    array_compact,
40    array,
41    "removes null values from the array.",
42    array_compact_udf
43);
44
45#[user_doc(
46    doc_section(label = "Array Functions"),
47    description = "Removes null values from the array.",
48    syntax_example = "array_compact(array)",
49    sql_example = r#"```sql
50> select array_compact([1, NULL, 2, NULL, 3]) arr;
51+-----------+
52| arr       |
53+-----------+
54| [1, 2, 3] |
55+-----------+
56```"#,
57    argument(
58        name = "array",
59        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
60    )
61)]
62#[derive(Debug, PartialEq, Eq, Hash)]
63pub struct ArrayCompact {
64    signature: Signature,
65    aliases: Vec<String>,
66}
67
68impl Default for ArrayCompact {
69    fn default() -> Self {
70        Self::new()
71    }
72}
73
74impl ArrayCompact {
75    pub fn new() -> Self {
76        Self {
77            signature: Signature::array(Volatility::Immutable),
78            aliases: vec!["list_compact".to_string()],
79        }
80    }
81}
82
83impl ScalarUDFImpl for ArrayCompact {
84    fn name(&self) -> &str {
85        "array_compact"
86    }
87
88    fn signature(&self) -> &Signature {
89        &self.signature
90    }
91
92    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
93        Ok(arg_types[0].clone())
94    }
95
96    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
97        make_scalar_function(array_compact_inner)(&args.args)
98    }
99
100    fn aliases(&self) -> &[String] {
101        &self.aliases
102    }
103
104    fn documentation(&self) -> Option<&Documentation> {
105        self.doc()
106    }
107}
108
109/// array_compact SQL function
110fn array_compact_inner(arg: &[ArrayRef]) -> Result<ArrayRef> {
111    let [input_array] = take_function_args("array_compact", arg)?;
112
113    match &input_array.data_type() {
114        List(field) => {
115            let array = as_list_array(input_array)?;
116            compact_list::<i32>(array, field)
117        }
118        LargeList(field) => {
119            let array = as_large_list_array(input_array)?;
120            compact_list::<i64>(array, field)
121        }
122        Null => Ok(Arc::clone(input_array)),
123        array_type => exec_err!("array_compact does not support type '{array_type}'."),
124    }
125}
126
127/// Remove null elements from each row of a list array.
128fn compact_list<O: OffsetSizeTrait>(
129    list_array: &GenericListArray<O>,
130    field: &Arc<arrow::datatypes::Field>,
131) -> Result<ArrayRef> {
132    let values = list_array.values();
133
134    // Fast path: no nulls in values, return input unchanged
135    if values.null_count() == 0 {
136        return Ok(Arc::new(list_array.clone()));
137    }
138
139    let original_data = values.to_data();
140    let capacity = original_data.len() - values.null_count();
141    let mut offsets = Vec::<O>::with_capacity(list_array.len() + 1);
142    offsets.push(O::zero());
143    let mut mutable = MutableArrayData::with_capacities(
144        vec![&original_data],
145        false,
146        Capacities::Array(capacity),
147    );
148
149    for row_index in 0..list_array.len() {
150        if list_array.nulls().is_some_and(|n| n.is_null(row_index)) {
151            offsets.push(offsets[row_index]);
152            continue;
153        }
154
155        let start = list_array.offsets()[row_index].as_usize();
156        let end = list_array.offsets()[row_index + 1].as_usize();
157        let mut copied = 0usize;
158
159        // Batch consecutive non-null elements into single extend() calls
160        // to reduce per-element overhead. For [1, 2, NULL, 3, 4] this
161        // produces 2 extend calls (0..2, 3..5) instead of 4 individual ones.
162        let mut batch_start: Option<usize> = None;
163        for i in start..end {
164            if values.is_null(i) {
165                // Null breaks the current batch — flush it
166                if let Some(bs) = batch_start {
167                    mutable.extend(0, bs, i);
168                    copied += i - bs;
169                    batch_start = None;
170                }
171            } else if batch_start.is_none() {
172                batch_start = Some(i);
173            }
174        }
175        // Flush any remaining batch after the loop
176        if let Some(bs) = batch_start {
177            mutable.extend(0, bs, end);
178            copied += end - bs;
179        }
180
181        offsets.push(offsets[row_index] + O::usize_as(copied));
182    }
183
184    let new_values = make_array(mutable.freeze());
185    Ok(Arc::new(GenericListArray::<O>::try_new(
186        Arc::clone(field),
187        OffsetBuffer::new(offsets.into()),
188        new_values,
189        list_array.nulls().cloned(),
190    )?))
191}