datafusion_functions/core/
coalesce.rs1use arrow::array::{new_null_array, BooleanArray};
19use arrow::compute::kernels::zip::zip;
20use arrow::compute::{and, is_not_null, is_null};
21use arrow::datatypes::{DataType, Field, FieldRef};
22use datafusion_common::{exec_err, internal_err, Result};
23use datafusion_expr::binary::try_type_union_resolution;
24use datafusion_expr::{
25 ColumnarValue, Documentation, ReturnFieldArgs, ScalarFunctionArgs,
26};
27use datafusion_expr::{ScalarUDFImpl, Signature, Volatility};
28use datafusion_macros::user_doc;
29use itertools::Itertools;
30use std::any::Any;
31
32#[user_doc(
33 doc_section(label = "Conditional Functions"),
34 description = "Returns the first of its arguments that is not _null_. Returns _null_ if all arguments are _null_. This function is often used to substitute a default value for _null_ values.",
35 syntax_example = "coalesce(expression1[, ..., expression_n])",
36 sql_example = r#"```sql
37> select coalesce(null, null, 'datafusion');
38+----------------------------------------+
39| coalesce(NULL,NULL,Utf8("datafusion")) |
40+----------------------------------------+
41| datafusion |
42+----------------------------------------+
43```"#,
44 argument(
45 name = "expression1, expression_n",
46 description = "Expression to use if previous expressions are _null_. Can be a constant, column, or function, and any combination of arithmetic operators. Pass as many expression arguments as necessary."
47 )
48)]
49#[derive(Debug)]
50pub struct CoalesceFunc {
51 signature: Signature,
52}
53
54impl Default for CoalesceFunc {
55 fn default() -> Self {
56 CoalesceFunc::new()
57 }
58}
59
60impl CoalesceFunc {
61 pub fn new() -> Self {
62 Self {
63 signature: Signature::user_defined(Volatility::Immutable),
64 }
65 }
66}
67
68impl ScalarUDFImpl for CoalesceFunc {
69 fn as_any(&self) -> &dyn Any {
70 self
71 }
72
73 fn name(&self) -> &str {
74 "coalesce"
75 }
76
77 fn signature(&self) -> &Signature {
78 &self.signature
79 }
80
81 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
82 internal_err!("return_field_from_args should be called instead")
83 }
84
85 fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<FieldRef> {
86 let nullable = args.arg_fields.iter().all(|f| f.is_nullable());
88 let return_type = args
89 .arg_fields
90 .iter()
91 .map(|f| f.data_type())
92 .find_or_first(|d| !d.is_null())
93 .unwrap()
94 .clone();
95 Ok(Field::new(self.name(), return_type, nullable).into())
96 }
97
98 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
100 let args = args.args;
101 if args.is_empty() {
103 return exec_err!(
104 "coalesce was called with {} arguments. It requires at least 1.",
105 args.len()
106 );
107 }
108
109 let return_type = args[0].data_type();
110 let mut return_array = args.iter().filter_map(|x| match x {
111 ColumnarValue::Array(array) => Some(array.len()),
112 _ => None,
113 });
114
115 if let Some(size) = return_array.next() {
116 let mut current_value = new_null_array(&return_type, size);
118 let mut remainder = BooleanArray::from(vec![true; size]);
119
120 for arg in args {
121 match arg {
122 ColumnarValue::Array(ref array) => {
123 let to_apply = and(&remainder, &is_not_null(array.as_ref())?)?;
124 current_value = zip(&to_apply, array, ¤t_value)?;
125 remainder = and(&remainder, &is_null(array)?)?;
126 }
127 ColumnarValue::Scalar(value) => {
128 if value.is_null() {
129 continue;
130 } else {
131 let last_value = value.to_scalar()?;
132 current_value = zip(&remainder, &last_value, ¤t_value)?;
133 break;
134 }
135 }
136 }
137 if remainder.iter().all(|x| x == Some(false)) {
138 break;
139 }
140 }
141 Ok(ColumnarValue::Array(current_value))
142 } else {
143 let result = args
144 .iter()
145 .filter_map(|x| match x {
146 ColumnarValue::Scalar(s) if !s.is_null() => Some(x.clone()),
147 _ => None,
148 })
149 .next()
150 .unwrap_or_else(|| args[0].clone());
151 Ok(result)
152 }
153 }
154
155 fn short_circuits(&self) -> bool {
156 true
157 }
158
159 fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
160 if arg_types.is_empty() {
161 return exec_err!("coalesce must have at least one argument");
162 }
163
164 try_type_union_resolution(arg_types)
165 }
166
167 fn documentation(&self) -> Option<&Documentation> {
168 self.doc()
169 }
170}