use arrow::datatypes::{DataType, Field, FieldRef};
use datafusion_common::{Result, exec_err, internal_err, plan_err};
use datafusion_expr::binary::try_type_union_resolution;
use datafusion_expr::conditional_expressions::CaseBuilder;
use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo};
use datafusion_expr::{
ColumnarValue, Documentation, Expr, ReturnFieldArgs, ScalarFunctionArgs,
};
use datafusion_expr::{ScalarUDFImpl, Signature, Volatility};
use datafusion_macros::user_doc;
use itertools::Itertools;
use std::any::Any;
#[user_doc(
doc_section(label = "Conditional Functions"),
description = "Returns the first of its arguments that is not _null_. Returns _null_ if all arguments are _null_. This function is often used to substitute a default value for _null_ values.",
syntax_example = "coalesce(expression1[, ..., expression_n])",
sql_example = r#"```sql
> select coalesce(null, null, 'datafusion');
+----------------------------------------+
| coalesce(NULL,NULL,Utf8("datafusion")) |
+----------------------------------------+
| datafusion |
+----------------------------------------+
```"#,
argument(
name = "expression1, expression_n",
description = "Expression to use if previous expressions are _null_. Can be a constant, column, or function, and any combination of arithmetic operators. Pass as many expression arguments as necessary."
)
)]
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct CoalesceFunc {
pub(super) signature: Signature,
}
impl Default for CoalesceFunc {
fn default() -> Self {
CoalesceFunc::new()
}
}
impl CoalesceFunc {
pub fn new() -> Self {
Self {
signature: Signature::user_defined(Volatility::Immutable),
}
}
}
impl ScalarUDFImpl for CoalesceFunc {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"coalesce"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
internal_err!("return_field_from_args should be called instead")
}
fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<FieldRef> {
let nullable = args.arg_fields.iter().all(|f| f.is_nullable());
let return_type = args
.arg_fields
.iter()
.map(|f| f.data_type())
.find_or_first(|d| !d.is_null())
.unwrap()
.clone();
Ok(Field::new(self.name(), return_type, nullable).into())
}
fn simplify(
&self,
args: Vec<Expr>,
_info: &dyn SimplifyInfo,
) -> Result<ExprSimplifyResult> {
if args.is_empty() {
return plan_err!("coalesce must have at least one argument");
}
if args.len() == 1 {
return Ok(ExprSimplifyResult::Simplified(
args.into_iter().next().unwrap(),
));
}
let n = args.len();
let (init, last_elem) = args.split_at(n - 1);
let whens = init
.iter()
.map(|x| x.clone().is_not_null())
.collect::<Vec<_>>();
let cases = init.to_vec();
Ok(ExprSimplifyResult::Simplified(
CaseBuilder::new(None, whens, cases, Some(Box::new(last_elem[0].clone())))
.end()?,
))
}
fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result<ColumnarValue> {
internal_err!("coalesce should have been simplified to case")
}
fn conditional_arguments<'a>(
&self,
args: &'a [Expr],
) -> Option<(Vec<&'a Expr>, Vec<&'a Expr>)> {
let eager = vec![&args[0]];
let lazy = args[1..].iter().collect();
Some((eager, lazy))
}
fn short_circuits(&self) -> bool {
true
}
fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
if arg_types.is_empty() {
return exec_err!("coalesce must have at least one argument");
}
try_type_union_resolution(arg_types)
}
fn documentation(&self) -> Option<&Documentation> {
self.doc()
}
}