datafusion_functions/core/
coalesce.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow::array::{new_null_array, BooleanArray};
19use arrow::compute::kernels::zip::zip;
20use arrow::compute::{and, is_not_null, is_null};
21use arrow::datatypes::{DataType, Field, FieldRef};
22use datafusion_common::{exec_err, internal_err, Result};
23use datafusion_expr::binary::try_type_union_resolution;
24use datafusion_expr::{
25    ColumnarValue, Documentation, ReturnFieldArgs, ScalarFunctionArgs,
26};
27use datafusion_expr::{ScalarUDFImpl, Signature, Volatility};
28use datafusion_macros::user_doc;
29use itertools::Itertools;
30use std::any::Any;
31
32#[user_doc(
33    doc_section(label = "Conditional Functions"),
34    description = "Returns the first of its arguments that is not _null_. Returns _null_ if all arguments are _null_. This function is often used to substitute a default value for _null_ values.",
35    syntax_example = "coalesce(expression1[, ..., expression_n])",
36    sql_example = r#"```sql
37> select coalesce(null, null, 'datafusion');
38+----------------------------------------+
39| coalesce(NULL,NULL,Utf8("datafusion")) |
40+----------------------------------------+
41| datafusion                             |
42+----------------------------------------+
43```"#,
44    argument(
45        name = "expression1, expression_n",
46        description = "Expression to use if previous expressions are _null_. Can be a constant, column, or function, and any combination of arithmetic operators. Pass as many expression arguments as necessary."
47    )
48)]
49#[derive(Debug)]
50pub struct CoalesceFunc {
51    signature: Signature,
52}
53
54impl Default for CoalesceFunc {
55    fn default() -> Self {
56        CoalesceFunc::new()
57    }
58}
59
60impl CoalesceFunc {
61    pub fn new() -> Self {
62        Self {
63            signature: Signature::user_defined(Volatility::Immutable),
64        }
65    }
66}
67
68impl ScalarUDFImpl for CoalesceFunc {
69    fn as_any(&self) -> &dyn Any {
70        self
71    }
72
73    fn name(&self) -> &str {
74        "coalesce"
75    }
76
77    fn signature(&self) -> &Signature {
78        &self.signature
79    }
80
81    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
82        internal_err!("return_field_from_args should be called instead")
83    }
84
85    fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<FieldRef> {
86        // If any the arguments in coalesce is non-null, the result is non-null
87        let nullable = args.arg_fields.iter().all(|f| f.is_nullable());
88        let return_type = args
89            .arg_fields
90            .iter()
91            .map(|f| f.data_type())
92            .find_or_first(|d| !d.is_null())
93            .unwrap()
94            .clone();
95        Ok(Field::new(self.name(), return_type, nullable).into())
96    }
97
98    /// coalesce evaluates to the first value which is not NULL
99    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
100        let args = args.args;
101        // do not accept 0 arguments.
102        if args.is_empty() {
103            return exec_err!(
104                "coalesce was called with {} arguments. It requires at least 1.",
105                args.len()
106            );
107        }
108
109        let return_type = args[0].data_type();
110        let mut return_array = args.iter().filter_map(|x| match x {
111            ColumnarValue::Array(array) => Some(array.len()),
112            _ => None,
113        });
114
115        if let Some(size) = return_array.next() {
116            // start with nulls as default output
117            let mut current_value = new_null_array(&return_type, size);
118            let mut remainder = BooleanArray::from(vec![true; size]);
119
120            for arg in args {
121                match arg {
122                    ColumnarValue::Array(ref array) => {
123                        let to_apply = and(&remainder, &is_not_null(array.as_ref())?)?;
124                        current_value = zip(&to_apply, array, &current_value)?;
125                        remainder = and(&remainder, &is_null(array)?)?;
126                    }
127                    ColumnarValue::Scalar(value) => {
128                        if value.is_null() {
129                            continue;
130                        } else {
131                            let last_value = value.to_scalar()?;
132                            current_value = zip(&remainder, &last_value, &current_value)?;
133                            break;
134                        }
135                    }
136                }
137                if remainder.iter().all(|x| x == Some(false)) {
138                    break;
139                }
140            }
141            Ok(ColumnarValue::Array(current_value))
142        } else {
143            let result = args
144                .iter()
145                .filter_map(|x| match x {
146                    ColumnarValue::Scalar(s) if !s.is_null() => Some(x.clone()),
147                    _ => None,
148                })
149                .next()
150                .unwrap_or_else(|| args[0].clone());
151            Ok(result)
152        }
153    }
154
155    fn short_circuits(&self) -> bool {
156        true
157    }
158
159    fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
160        if arg_types.is_empty() {
161            return exec_err!("coalesce must have at least one argument");
162        }
163
164        try_type_union_resolution(arg_types)
165    }
166
167    fn documentation(&self) -> Option<&Documentation> {
168        self.doc()
169    }
170}