datafusion_expr/
expr_schema.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use super::{Between, Expr, Like, predicate_bounds};
19use crate::expr::{
20    AggregateFunction, AggregateFunctionParams, Alias, BinaryExpr, Cast, InList,
21    InSubquery, Placeholder, ScalarFunction, TryCast, Unnest, WindowFunction,
22    WindowFunctionParams,
23};
24use crate::type_coercion::functions::fields_with_udf;
25use crate::udf::ReturnFieldArgs;
26use crate::{LogicalPlan, Projection, Subquery, WindowFunctionDefinition, utils};
27use arrow::compute::can_cast_types;
28use arrow::datatypes::{DataType, Field};
29use datafusion_common::datatype::FieldExt;
30use datafusion_common::metadata::FieldMetadata;
31use datafusion_common::{
32    Column, DataFusionError, ExprSchema, Result, ScalarValue, Spans, TableReference,
33    not_impl_err, plan_datafusion_err, plan_err,
34};
35use datafusion_expr_common::type_coercion::binary::BinaryTypeCoercer;
36use datafusion_functions_window_common::field::WindowUDFFieldArgs;
37use std::sync::Arc;
38
39/// Trait to allow expr to typable with respect to a schema
40pub trait ExprSchemable {
41    /// Given a schema, return the type of the expr
42    fn get_type(&self, schema: &dyn ExprSchema) -> Result<DataType>;
43
44    /// Given a schema, return the nullability of the expr
45    fn nullable(&self, input_schema: &dyn ExprSchema) -> Result<bool>;
46
47    /// Given a schema, return the expr's optional metadata
48    fn metadata(&self, schema: &dyn ExprSchema) -> Result<FieldMetadata>;
49
50    /// Convert to a field with respect to a schema
51    fn to_field(
52        &self,
53        input_schema: &dyn ExprSchema,
54    ) -> Result<(Option<TableReference>, Arc<Field>)>;
55
56    /// Cast to a type with respect to a schema
57    fn cast_to(self, cast_to_type: &DataType, schema: &dyn ExprSchema) -> Result<Expr>;
58
59    /// Given a schema, return the type and nullability of the expr
60    #[deprecated(
61        since = "51.0.0",
62        note = "Use `to_field().1.is_nullable` and `to_field().1.data_type()` directly instead"
63    )]
64    fn data_type_and_nullable(&self, schema: &dyn ExprSchema)
65    -> Result<(DataType, bool)>;
66}
67
68impl ExprSchemable for Expr {
69    /// Returns the [arrow::datatypes::DataType] of the expression
70    /// based on [ExprSchema]
71    ///
72    /// Note: [`DFSchema`] implements [ExprSchema].
73    ///
74    /// [`DFSchema`]: datafusion_common::DFSchema
75    ///
76    /// # Examples
77    ///
78    /// Get the type of an expression that adds 2 columns. Adding an Int32
79    /// and Float32 results in Float32 type
80    ///
81    /// ```
82    /// # use arrow::datatypes::{DataType, Field};
83    /// # use datafusion_common::DFSchema;
84    /// # use datafusion_expr::{col, ExprSchemable};
85    /// # use std::collections::HashMap;
86    ///
87    /// fn main() {
88    ///     let expr = col("c1") + col("c2");
89    ///     let schema = DFSchema::from_unqualified_fields(
90    ///         vec![
91    ///             Field::new("c1", DataType::Int32, true),
92    ///             Field::new("c2", DataType::Float32, true),
93    ///         ]
94    ///         .into(),
95    ///         HashMap::new(),
96    ///     )
97    ///     .unwrap();
98    ///     assert_eq!("Float32", format!("{}", expr.get_type(&schema).unwrap()));
99    /// }
100    /// ```
101    ///
102    /// # Errors
103    ///
104    /// This function errors when it is not possible to compute its
105    /// [arrow::datatypes::DataType].  This happens when e.g. the
106    /// expression refers to a column that does not exist in the
107    /// schema, or when the expression is incorrectly typed
108    /// (e.g. `[utf8] + [bool]`).
109    #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
110    fn get_type(&self, schema: &dyn ExprSchema) -> Result<DataType> {
111        match self {
112            Expr::Alias(Alias { expr, name, .. }) => match &**expr {
113                Expr::Placeholder(Placeholder { field, .. }) => match &field {
114                    None => schema.data_type(&Column::from_name(name)).cloned(),
115                    Some(field) => Ok(field.data_type().clone()),
116                },
117                _ => expr.get_type(schema),
118            },
119            Expr::Negative(expr) => expr.get_type(schema),
120            Expr::Column(c) => Ok(schema.data_type(c)?.clone()),
121            Expr::OuterReferenceColumn(field, _) => Ok(field.data_type().clone()),
122            Expr::ScalarVariable(field, _) => Ok(field.data_type().clone()),
123            Expr::Literal(l, _) => Ok(l.data_type()),
124            Expr::Case(case) => {
125                for (_, then_expr) in &case.when_then_expr {
126                    let then_type = then_expr.get_type(schema)?;
127                    if !then_type.is_null() {
128                        return Ok(then_type);
129                    }
130                }
131                case.else_expr
132                    .as_ref()
133                    .map_or(Ok(DataType::Null), |e| e.get_type(schema))
134            }
135            Expr::Cast(Cast { data_type, .. })
136            | Expr::TryCast(TryCast { data_type, .. }) => Ok(data_type.clone()),
137            Expr::Unnest(Unnest { expr }) => {
138                let arg_data_type = expr.get_type(schema)?;
139                // Unnest's output type is the inner type of the list
140                match arg_data_type {
141                    DataType::List(field)
142                    | DataType::LargeList(field)
143                    | DataType::FixedSizeList(field, _) => Ok(field.data_type().clone()),
144                    DataType::Struct(_) => Ok(arg_data_type),
145                    DataType::Null => {
146                        not_impl_err!("unnest() does not support null yet")
147                    }
148                    _ => {
149                        plan_err!(
150                            "unnest() can only be applied to array, struct and null"
151                        )
152                    }
153                }
154            }
155            Expr::ScalarFunction(_func) => {
156                let return_type = self.to_field(schema)?.1.data_type().clone();
157                Ok(return_type)
158            }
159            Expr::WindowFunction(window_function) => self
160                .data_type_and_nullable_with_window_function(schema, window_function)
161                .map(|(return_type, _)| return_type),
162            Expr::AggregateFunction(AggregateFunction {
163                func,
164                params: AggregateFunctionParams { args, .. },
165            }) => {
166                let fields = args
167                    .iter()
168                    .map(|e| e.to_field(schema).map(|(_, f)| f))
169                    .collect::<Result<Vec<_>>>()?;
170                let new_fields = fields_with_udf(&fields, func.as_ref())
171                    .map_err(|err| {
172                        let data_types = fields
173                            .iter()
174                            .map(|f| f.data_type().clone())
175                            .collect::<Vec<_>>();
176                        plan_datafusion_err!(
177                            "{} {}",
178                            match err {
179                                DataFusionError::Plan(msg) => msg,
180                                err => err.to_string(),
181                            },
182                            utils::generate_signature_error_msg(
183                                func.name(),
184                                func.signature().clone(),
185                                &data_types
186                            )
187                        )
188                    })?
189                    .into_iter()
190                    .collect::<Vec<_>>();
191                Ok(func.return_field(&new_fields)?.data_type().clone())
192            }
193            Expr::Not(_)
194            | Expr::IsNull(_)
195            | Expr::Exists { .. }
196            | Expr::InSubquery(_)
197            | Expr::Between { .. }
198            | Expr::InList { .. }
199            | Expr::IsNotNull(_)
200            | Expr::IsTrue(_)
201            | Expr::IsFalse(_)
202            | Expr::IsUnknown(_)
203            | Expr::IsNotTrue(_)
204            | Expr::IsNotFalse(_)
205            | Expr::IsNotUnknown(_) => Ok(DataType::Boolean),
206            Expr::ScalarSubquery(subquery) => {
207                Ok(subquery.subquery.schema().field(0).data_type().clone())
208            }
209            Expr::BinaryExpr(BinaryExpr { left, right, op }) => BinaryTypeCoercer::new(
210                &left.get_type(schema)?,
211                op,
212                &right.get_type(schema)?,
213            )
214            .get_result_type(),
215            Expr::Like { .. } | Expr::SimilarTo { .. } => Ok(DataType::Boolean),
216            Expr::Placeholder(Placeholder { field, .. }) => {
217                if let Some(field) = field {
218                    Ok(field.data_type().clone())
219                } else {
220                    // If the placeholder's type hasn't been specified, treat it as
221                    // null (unspecified placeholders generate an error during planning)
222                    Ok(DataType::Null)
223                }
224            }
225            #[expect(deprecated)]
226            Expr::Wildcard { .. } => Ok(DataType::Null),
227            Expr::GroupingSet(_) => {
228                // Grouping sets do not really have a type and do not appear in projections
229                Ok(DataType::Null)
230            }
231        }
232    }
233
234    /// Returns the nullability of the expression based on [ExprSchema].
235    ///
236    /// Note: [`DFSchema`] implements [ExprSchema].
237    ///
238    /// [`DFSchema`]: datafusion_common::DFSchema
239    ///
240    /// # Errors
241    ///
242    /// This function errors when it is not possible to compute its
243    /// nullability.  This happens when the expression refers to a
244    /// column that does not exist in the schema.
245    fn nullable(&self, input_schema: &dyn ExprSchema) -> Result<bool> {
246        match self {
247            Expr::Alias(Alias { expr, .. }) | Expr::Not(expr) | Expr::Negative(expr) => {
248                expr.nullable(input_schema)
249            }
250
251            Expr::InList(InList { expr, list, .. }) => {
252                // Avoid inspecting too many expressions.
253                const MAX_INSPECT_LIMIT: usize = 6;
254                // Stop if a nullable expression is found or an error occurs.
255                let has_nullable = std::iter::once(expr.as_ref())
256                    .chain(list)
257                    .take(MAX_INSPECT_LIMIT)
258                    .find_map(|e| {
259                        e.nullable(input_schema)
260                            .map(|nullable| if nullable { Some(()) } else { None })
261                            .transpose()
262                    })
263                    .transpose()?;
264                Ok(match has_nullable {
265                    // If a nullable subexpression is found, the result may also be nullable.
266                    Some(_) => true,
267                    // If the list is too long, we assume it is nullable.
268                    None if list.len() + 1 > MAX_INSPECT_LIMIT => true,
269                    // All the subexpressions are non-nullable, so the result must be non-nullable.
270                    _ => false,
271                })
272            }
273
274            Expr::Between(Between {
275                expr, low, high, ..
276            }) => Ok(expr.nullable(input_schema)?
277                || low.nullable(input_schema)?
278                || high.nullable(input_schema)?),
279
280            Expr::Column(c) => input_schema.nullable(c),
281            Expr::OuterReferenceColumn(field, _) => Ok(field.is_nullable()),
282            Expr::Literal(value, _) => Ok(value.is_null()),
283            Expr::Case(case) => {
284                let nullable_then = case
285                    .when_then_expr
286                    .iter()
287                    .filter_map(|(w, t)| {
288                        let is_nullable = match t.nullable(input_schema) {
289                            Err(e) => return Some(Err(e)),
290                            Ok(n) => n,
291                        };
292
293                        // Branches with a then expression that is not nullable do not impact the
294                        // nullability of the case expression.
295                        if !is_nullable {
296                            return None;
297                        }
298
299                        // For case-with-expression assume all 'then' expressions are reachable
300                        if case.expr.is_some() {
301                            return Some(Ok(()));
302                        }
303
304                        // For branches with a nullable 'then' expression, try to determine
305                        // if the 'then' expression is ever reachable in the situation where
306                        // it would evaluate to null.
307                        let bounds = match predicate_bounds::evaluate_bounds(
308                            w,
309                            Some(unwrap_certainly_null_expr(t)),
310                            input_schema,
311                        ) {
312                            Err(e) => return Some(Err(e)),
313                            Ok(b) => b,
314                        };
315
316                        let can_be_true = match bounds
317                            .contains_value(ScalarValue::Boolean(Some(true)))
318                        {
319                            Err(e) => return Some(Err(e)),
320                            Ok(b) => b,
321                        };
322
323                        if !can_be_true {
324                            // If the derived 'when' expression can never evaluate to true, the
325                            // 'then' expression is not reachable when it would evaluate to NULL.
326                            // The most common pattern for this is `WHEN x IS NOT NULL THEN x`.
327                            None
328                        } else {
329                            // The branch might be taken
330                            Some(Ok(()))
331                        }
332                    })
333                    .next();
334
335                if let Some(nullable_then) = nullable_then {
336                    // There is at least one reachable nullable 'then' expression, so the case
337                    // expression itself is nullable.
338                    // Use `Result::map` to propagate the error from `nullable_then` if there is one.
339                    nullable_then.map(|_| true)
340                } else if let Some(e) = &case.else_expr {
341                    // There are no reachable nullable 'then' expressions, so all we still need to
342                    // check is the 'else' expression's nullability.
343                    e.nullable(input_schema)
344                } else {
345                    // CASE produces NULL if there is no `else` expr
346                    // (aka when none of the `when_then_exprs` match)
347                    Ok(true)
348                }
349            }
350            Expr::Cast(Cast { expr, .. }) => expr.nullable(input_schema),
351            Expr::ScalarFunction(_func) => {
352                let field = self.to_field(input_schema)?.1;
353
354                let nullable = field.is_nullable();
355                Ok(nullable)
356            }
357            Expr::AggregateFunction(AggregateFunction { func, .. }) => {
358                Ok(func.is_nullable())
359            }
360            Expr::WindowFunction(window_function) => self
361                .data_type_and_nullable_with_window_function(
362                    input_schema,
363                    window_function,
364                )
365                .map(|(_, nullable)| nullable),
366            Expr::ScalarVariable(field, _) => Ok(field.is_nullable()),
367            Expr::TryCast { .. } | Expr::Unnest(_) | Expr::Placeholder(_) => Ok(true),
368            Expr::IsNull(_)
369            | Expr::IsNotNull(_)
370            | Expr::IsTrue(_)
371            | Expr::IsFalse(_)
372            | Expr::IsUnknown(_)
373            | Expr::IsNotTrue(_)
374            | Expr::IsNotFalse(_)
375            | Expr::IsNotUnknown(_)
376            | Expr::Exists { .. } => Ok(false),
377            Expr::InSubquery(InSubquery { expr, .. }) => expr.nullable(input_schema),
378            Expr::ScalarSubquery(subquery) => {
379                Ok(subquery.subquery.schema().field(0).is_nullable())
380            }
381            Expr::BinaryExpr(BinaryExpr { left, right, .. }) => {
382                Ok(left.nullable(input_schema)? || right.nullable(input_schema)?)
383            }
384            Expr::Like(Like { expr, pattern, .. })
385            | Expr::SimilarTo(Like { expr, pattern, .. }) => {
386                Ok(expr.nullable(input_schema)? || pattern.nullable(input_schema)?)
387            }
388            #[expect(deprecated)]
389            Expr::Wildcard { .. } => Ok(false),
390            Expr::GroupingSet(_) => {
391                // Grouping sets do not really have the concept of nullable and do not appear
392                // in projections
393                Ok(true)
394            }
395        }
396    }
397
398    fn metadata(&self, schema: &dyn ExprSchema) -> Result<FieldMetadata> {
399        self.to_field(schema)
400            .map(|(_, field)| FieldMetadata::from(field.metadata()))
401    }
402
403    /// Returns the datatype and nullability of the expression based on [ExprSchema].
404    ///
405    /// Note: [`DFSchema`] implements [ExprSchema].
406    ///
407    /// [`DFSchema`]: datafusion_common::DFSchema
408    ///
409    /// # Errors
410    ///
411    /// This function errors when it is not possible to compute its
412    /// datatype or nullability.
413    fn data_type_and_nullable(
414        &self,
415        schema: &dyn ExprSchema,
416    ) -> Result<(DataType, bool)> {
417        let field = self.to_field(schema)?.1;
418
419        Ok((field.data_type().clone(), field.is_nullable()))
420    }
421
422    /// Returns a [arrow::datatypes::Field] compatible with this expression.
423    ///
424    /// This function converts an expression into a field with appropriate metadata
425    /// and nullability based on the expression type and context. It is the primary
426    /// mechanism for determining field-level schemas.
427    ///
428    /// # Field Property Resolution
429    ///
430    /// For each expression, the following properties are determined:
431    ///
432    /// ## Data Type Resolution
433    /// - **Column references**: Data type from input schema field
434    /// - **Literals**: Data type inferred from literal value
435    /// - **Aliases**: Data type inherited from the underlying expression (the aliased expression)
436    /// - **Binary expressions**: Result type from type coercion rules
437    /// - **Boolean expressions**: Always a boolean type
438    /// - **Cast expressions**: Target data type from cast operation
439    /// - **Function calls**: Return type based on function signature and argument types
440    ///
441    /// ## Nullability Determination
442    /// - **Column references**: Inherit nullability from input schema field
443    /// - **Literals**: Nullable only if literal value is NULL
444    /// - **Aliases**: Inherit nullability from the underlying expression (the aliased expression)
445    /// - **Binary expressions**: Nullable if either operand is nullable
446    /// - **Boolean expressions**: Always non-nullable (IS NULL, EXISTS, etc.)
447    /// - **Cast expressions**: determined by the input expression's nullability rules
448    /// - **Function calls**: Based on function nullability rules and input nullability
449    ///
450    /// ## Metadata Handling
451    /// - **Column references**: Preserve original field metadata from input schema
452    /// - **Literals**: Use explicitly provided metadata, otherwise empty
453    /// - **Aliases**: Merge underlying expr metadata with alias-specific metadata, preferring the alias metadata
454    /// - **Binary expressions**: field metadata is empty
455    /// - **Boolean expressions**: field metadata is empty
456    /// - **Cast expressions**: determined by the input expression's field metadata handling
457    /// - **Scalar functions**: Generate metadata via function's [`return_field_from_args`] method,
458    ///   with the default implementation returning empty field metadata
459    /// - **Aggregate functions**: Generate metadata via function's [`return_field`] method,
460    ///   with the default implementation returning empty field metadata
461    /// - **Window functions**: field metadata is empty
462    ///
463    /// ## Table Reference Scoping
464    /// - Establishes proper qualified field references when columns belong to specific tables
465    /// - Maintains table context for accurate field resolution in multi-table scenarios
466    ///
467    /// So for example, a projected expression `col(c1) + col(c2)` is
468    /// placed in an output field **named** col("c1 + c2")
469    ///
470    /// [`return_field_from_args`]: crate::ScalarUDF::return_field_from_args
471    /// [`return_field`]: crate::AggregateUDF::return_field
472    fn to_field(
473        &self,
474        schema: &dyn ExprSchema,
475    ) -> Result<(Option<TableReference>, Arc<Field>)> {
476        let (relation, schema_name) = self.qualified_name();
477        #[expect(deprecated)]
478        let field = match self {
479            Expr::Alias(Alias {
480                expr,
481                name: _,
482                metadata,
483                ..
484            }) => {
485                let mut combined_metadata = expr.metadata(schema)?;
486                if let Some(metadata) = metadata {
487                    combined_metadata.extend(metadata.clone());
488                }
489
490                Ok(expr
491                    .to_field(schema)
492                    .map(|(_, f)| f)?
493                    .with_field_metadata(&combined_metadata))
494            }
495            Expr::Negative(expr) => expr.to_field(schema).map(|(_, f)| f),
496            Expr::Column(c) => schema.field_from_column(c).map(Arc::clone),
497            Expr::OuterReferenceColumn(field, _) => {
498                Ok(Arc::clone(field).renamed(&schema_name))
499            }
500            Expr::ScalarVariable(field, _) => Ok(Arc::clone(field).renamed(&schema_name)),
501            Expr::Literal(l, metadata) => Ok(Arc::new(
502                Field::new(&schema_name, l.data_type(), l.is_null())
503                    .with_field_metadata_opt(metadata.as_ref()),
504            )),
505            Expr::IsNull(_)
506            | Expr::IsNotNull(_)
507            | Expr::IsTrue(_)
508            | Expr::IsFalse(_)
509            | Expr::IsUnknown(_)
510            | Expr::IsNotTrue(_)
511            | Expr::IsNotFalse(_)
512            | Expr::IsNotUnknown(_)
513            | Expr::Exists { .. } => {
514                Ok(Arc::new(Field::new(&schema_name, DataType::Boolean, false)))
515            }
516            Expr::ScalarSubquery(subquery) => {
517                Ok(Arc::clone(&subquery.subquery.schema().fields()[0]))
518            }
519            Expr::BinaryExpr(BinaryExpr { left, right, op }) => {
520                let (left_field, right_field) =
521                    (left.to_field(schema)?.1, right.to_field(schema)?.1);
522
523                let (lhs_type, lhs_nullable) =
524                    (left_field.data_type(), left_field.is_nullable());
525                let (rhs_type, rhs_nullable) =
526                    (right_field.data_type(), right_field.is_nullable());
527                let mut coercer = BinaryTypeCoercer::new(lhs_type, op, rhs_type);
528                coercer.set_lhs_spans(left.spans().cloned().unwrap_or_default());
529                coercer.set_rhs_spans(right.spans().cloned().unwrap_or_default());
530                Ok(Arc::new(Field::new(
531                    &schema_name,
532                    coercer.get_result_type()?,
533                    lhs_nullable || rhs_nullable,
534                )))
535            }
536            Expr::WindowFunction(window_function) => {
537                let (dt, nullable) = self.data_type_and_nullable_with_window_function(
538                    schema,
539                    window_function,
540                )?;
541                Ok(Arc::new(Field::new(&schema_name, dt, nullable)))
542            }
543            Expr::AggregateFunction(aggregate_function) => {
544                let AggregateFunction {
545                    func,
546                    params: AggregateFunctionParams { args, .. },
547                    ..
548                } = aggregate_function;
549
550                let fields = args
551                    .iter()
552                    .map(|e| e.to_field(schema).map(|(_, f)| f))
553                    .collect::<Result<Vec<_>>>()?;
554                // Verify that function is invoked with correct number and type of arguments as defined in `TypeSignature`
555                let new_fields = fields_with_udf(&fields, func.as_ref())
556                    .map_err(|err| {
557                        let arg_types = fields
558                            .iter()
559                            .map(|f| f.data_type())
560                            .cloned()
561                            .collect::<Vec<_>>();
562                        plan_datafusion_err!(
563                            "{} {}",
564                            match err {
565                                DataFusionError::Plan(msg) => msg,
566                                err => err.to_string(),
567                            },
568                            utils::generate_signature_error_msg(
569                                func.name(),
570                                func.signature().clone(),
571                                &arg_types,
572                            )
573                        )
574                    })?
575                    .into_iter()
576                    .collect::<Vec<_>>();
577
578                func.return_field(&new_fields)
579            }
580            Expr::ScalarFunction(ScalarFunction { func, args }) => {
581                let (arg_types, fields): (Vec<DataType>, Vec<Arc<Field>>) = args
582                    .iter()
583                    .map(|e| e.to_field(schema).map(|(_, f)| f))
584                    .collect::<Result<Vec<_>>>()?
585                    .into_iter()
586                    .map(|f| (f.data_type().clone(), f))
587                    .unzip();
588                // Verify that function is invoked with correct number and type of arguments as defined in `TypeSignature`
589                let new_fields =
590                    fields_with_udf(&fields, func.as_ref()).map_err(|err| {
591                        plan_datafusion_err!(
592                            "{} {}",
593                            match err {
594                                DataFusionError::Plan(msg) => msg,
595                                err => err.to_string(),
596                            },
597                            utils::generate_signature_error_msg(
598                                func.name(),
599                                func.signature().clone(),
600                                &arg_types,
601                            )
602                        )
603                    })?;
604
605                let arguments = args
606                    .iter()
607                    .map(|e| match e {
608                        Expr::Literal(sv, _) => Some(sv),
609                        _ => None,
610                    })
611                    .collect::<Vec<_>>();
612                let args = ReturnFieldArgs {
613                    arg_fields: &new_fields,
614                    scalar_arguments: &arguments,
615                };
616
617                func.return_field_from_args(args)
618            }
619            // _ => Ok((self.get_type(schema)?, self.nullable(schema)?)),
620            Expr::Cast(Cast { expr, data_type }) => expr
621                .to_field(schema)
622                .map(|(_, f)| f.retyped(data_type.clone())),
623            Expr::Placeholder(Placeholder {
624                id: _,
625                field: Some(field),
626            }) => Ok(Arc::clone(field).renamed(&schema_name)),
627            Expr::Like(_)
628            | Expr::SimilarTo(_)
629            | Expr::Not(_)
630            | Expr::Between(_)
631            | Expr::Case(_)
632            | Expr::TryCast(_)
633            | Expr::InList(_)
634            | Expr::InSubquery(_)
635            | Expr::Wildcard { .. }
636            | Expr::GroupingSet(_)
637            | Expr::Placeholder(_)
638            | Expr::Unnest(_) => Ok(Arc::new(Field::new(
639                &schema_name,
640                self.get_type(schema)?,
641                self.nullable(schema)?,
642            ))),
643        }?;
644
645        Ok((
646            relation,
647            // todo avoid this rename / use the name above
648            field.renamed(&schema_name),
649        ))
650    }
651
652    /// Wraps this expression in a cast to a target [arrow::datatypes::DataType].
653    ///
654    /// # Errors
655    ///
656    /// This function errors when it is impossible to cast the
657    /// expression to the target [arrow::datatypes::DataType].
658    fn cast_to(self, cast_to_type: &DataType, schema: &dyn ExprSchema) -> Result<Expr> {
659        let this_type = self.get_type(schema)?;
660        if this_type == *cast_to_type {
661            return Ok(self);
662        }
663
664        // TODO(kszucs): Most of the operations do not validate the type correctness
665        // like all of the binary expressions below. Perhaps Expr should track the
666        // type of the expression?
667
668        if can_cast_types(&this_type, cast_to_type) {
669            match self {
670                Expr::ScalarSubquery(subquery) => {
671                    Ok(Expr::ScalarSubquery(cast_subquery(subquery, cast_to_type)?))
672                }
673                _ => Ok(Expr::Cast(Cast::new(Box::new(self), cast_to_type.clone()))),
674            }
675        } else {
676            plan_err!("Cannot automatically convert {this_type} to {cast_to_type}")
677        }
678    }
679}
680
681/// Returns the innermost [Expr] that is provably null if `expr` is null.
682fn unwrap_certainly_null_expr(expr: &Expr) -> &Expr {
683    match expr {
684        Expr::Not(e) => unwrap_certainly_null_expr(e),
685        Expr::Negative(e) => unwrap_certainly_null_expr(e),
686        Expr::Cast(e) => unwrap_certainly_null_expr(e.expr.as_ref()),
687        _ => expr,
688    }
689}
690
691impl Expr {
692    /// Common method for window functions that applies type coercion
693    /// to all arguments of the window function to check if it matches
694    /// its signature.
695    ///
696    /// If successful, this method returns the data type and
697    /// nullability of the window function's result.
698    ///
699    /// Otherwise, returns an error if there's a type mismatch between
700    /// the window function's signature and the provided arguments.
701    fn data_type_and_nullable_with_window_function(
702        &self,
703        schema: &dyn ExprSchema,
704        window_function: &WindowFunction,
705    ) -> Result<(DataType, bool)> {
706        let WindowFunction {
707            fun,
708            params: WindowFunctionParams { args, .. },
709            ..
710        } = window_function;
711
712        let fields = args
713            .iter()
714            .map(|e| e.to_field(schema).map(|(_, f)| f))
715            .collect::<Result<Vec<_>>>()?;
716        match fun {
717            WindowFunctionDefinition::AggregateUDF(udaf) => {
718                let data_types = fields
719                    .iter()
720                    .map(|f| f.data_type())
721                    .cloned()
722                    .collect::<Vec<_>>();
723                let new_fields = fields_with_udf(&fields, udaf.as_ref())
724                    .map_err(|err| {
725                        plan_datafusion_err!(
726                            "{} {}",
727                            match err {
728                                DataFusionError::Plan(msg) => msg,
729                                err => err.to_string(),
730                            },
731                            utils::generate_signature_error_msg(
732                                fun.name(),
733                                fun.signature(),
734                                &data_types
735                            )
736                        )
737                    })?
738                    .into_iter()
739                    .collect::<Vec<_>>();
740
741                let return_field = udaf.return_field(&new_fields)?;
742
743                Ok((return_field.data_type().clone(), return_field.is_nullable()))
744            }
745            WindowFunctionDefinition::WindowUDF(udwf) => {
746                let data_types = fields
747                    .iter()
748                    .map(|f| f.data_type())
749                    .cloned()
750                    .collect::<Vec<_>>();
751                let new_fields = fields_with_udf(&fields, udwf.as_ref())
752                    .map_err(|err| {
753                        plan_datafusion_err!(
754                            "{} {}",
755                            match err {
756                                DataFusionError::Plan(msg) => msg,
757                                err => err.to_string(),
758                            },
759                            utils::generate_signature_error_msg(
760                                fun.name(),
761                                fun.signature(),
762                                &data_types
763                            )
764                        )
765                    })?
766                    .into_iter()
767                    .collect::<Vec<_>>();
768                let (_, function_name) = self.qualified_name();
769                let field_args = WindowUDFFieldArgs::new(&new_fields, &function_name);
770
771                udwf.field(field_args)
772                    .map(|field| (field.data_type().clone(), field.is_nullable()))
773            }
774        }
775    }
776}
777
778/// Cast subquery in InSubquery/ScalarSubquery to a given type.
779///
780/// 1. **Projection plan**: If the subquery is a projection (i.e. a SELECT statement with specific
781///    columns), it casts the first expression in the projection to the target type and creates a
782///    new projection with the casted expression.
783/// 2. **Non-projection plan**: If the subquery isn't a projection, it adds a projection to the plan
784///    with the casted first column.
785pub fn cast_subquery(subquery: Subquery, cast_to_type: &DataType) -> Result<Subquery> {
786    if subquery.subquery.schema().field(0).data_type() == cast_to_type {
787        return Ok(subquery);
788    }
789
790    let plan = subquery.subquery.as_ref();
791    let new_plan = match plan {
792        LogicalPlan::Projection(projection) => {
793            let cast_expr = projection.expr[0]
794                .clone()
795                .cast_to(cast_to_type, projection.input.schema())?;
796            LogicalPlan::Projection(Projection::try_new(
797                vec![cast_expr],
798                Arc::clone(&projection.input),
799            )?)
800        }
801        _ => {
802            let cast_expr = Expr::Column(Column::from(plan.schema().qualified_field(0)))
803                .cast_to(cast_to_type, subquery.subquery.schema())?;
804            LogicalPlan::Projection(Projection::try_new(
805                vec![cast_expr],
806                subquery.subquery,
807            )?)
808        }
809    };
810    Ok(Subquery {
811        subquery: Arc::new(new_plan),
812        outer_ref_columns: subquery.outer_ref_columns,
813        spans: Spans::new(),
814    })
815}
816
817#[cfg(test)]
818mod tests {
819    use std::collections::HashMap;
820
821    use super::*;
822    use crate::{and, col, lit, not, or, out_ref_col_with_metadata, when};
823
824    use arrow::datatypes::FieldRef;
825    use datafusion_common::{DFSchema, ScalarValue, assert_or_internal_err};
826
827    macro_rules! test_is_expr_nullable {
828        ($EXPR_TYPE:ident) => {{
829            let expr = lit(ScalarValue::Null).$EXPR_TYPE();
830            assert!(!expr.nullable(&MockExprSchema::new()).unwrap());
831        }};
832    }
833
834    #[test]
835    fn expr_schema_nullability() {
836        let expr = col("foo").eq(lit(1));
837        assert!(!expr.nullable(&MockExprSchema::new()).unwrap());
838        assert!(
839            expr.nullable(&MockExprSchema::new().with_nullable(true))
840                .unwrap()
841        );
842
843        test_is_expr_nullable!(is_null);
844        test_is_expr_nullable!(is_not_null);
845        test_is_expr_nullable!(is_true);
846        test_is_expr_nullable!(is_not_true);
847        test_is_expr_nullable!(is_false);
848        test_is_expr_nullable!(is_not_false);
849        test_is_expr_nullable!(is_unknown);
850        test_is_expr_nullable!(is_not_unknown);
851    }
852
853    #[test]
854    fn test_between_nullability() {
855        let get_schema = |nullable| {
856            MockExprSchema::new()
857                .with_data_type(DataType::Int32)
858                .with_nullable(nullable)
859        };
860
861        let expr = col("foo").between(lit(1), lit(2));
862        assert!(!expr.nullable(&get_schema(false)).unwrap());
863        assert!(expr.nullable(&get_schema(true)).unwrap());
864
865        let null = lit(ScalarValue::Int32(None));
866
867        let expr = col("foo").between(null.clone(), lit(2));
868        assert!(expr.nullable(&get_schema(false)).unwrap());
869
870        let expr = col("foo").between(lit(1), null.clone());
871        assert!(expr.nullable(&get_schema(false)).unwrap());
872
873        let expr = col("foo").between(null.clone(), null);
874        assert!(expr.nullable(&get_schema(false)).unwrap());
875    }
876
877    fn assert_nullability(expr: &Expr, schema: &dyn ExprSchema, expected: bool) {
878        assert_eq!(
879            expr.nullable(schema).unwrap(),
880            expected,
881            "Nullability of '{expr}' should be {expected}"
882        );
883    }
884
885    fn assert_not_nullable(expr: &Expr, schema: &dyn ExprSchema) {
886        assert_nullability(expr, schema, false);
887    }
888
889    fn assert_nullable(expr: &Expr, schema: &dyn ExprSchema) {
890        assert_nullability(expr, schema, true);
891    }
892
893    #[test]
894    fn test_case_expression_nullability() -> Result<()> {
895        let nullable_schema = MockExprSchema::new()
896            .with_data_type(DataType::Int32)
897            .with_nullable(true);
898
899        let not_nullable_schema = MockExprSchema::new()
900            .with_data_type(DataType::Int32)
901            .with_nullable(false);
902
903        // CASE WHEN x IS NOT NULL THEN x ELSE 0
904        let e = when(col("x").is_not_null(), col("x")).otherwise(lit(0))?;
905        assert_not_nullable(&e, &nullable_schema);
906        assert_not_nullable(&e, &not_nullable_schema);
907
908        // CASE WHEN NOT x IS NULL THEN x ELSE 0
909        let e = when(not(col("x").is_null()), col("x")).otherwise(lit(0))?;
910        assert_not_nullable(&e, &nullable_schema);
911        assert_not_nullable(&e, &not_nullable_schema);
912
913        // CASE WHEN X = 5 THEN x ELSE 0
914        let e = when(col("x").eq(lit(5)), col("x")).otherwise(lit(0))?;
915        assert_not_nullable(&e, &nullable_schema);
916        assert_not_nullable(&e, &not_nullable_schema);
917
918        // CASE WHEN x IS NOT NULL AND x = 5 THEN x ELSE 0
919        let e = when(and(col("x").is_not_null(), col("x").eq(lit(5))), col("x"))
920            .otherwise(lit(0))?;
921        assert_not_nullable(&e, &nullable_schema);
922        assert_not_nullable(&e, &not_nullable_schema);
923
924        // CASE WHEN x = 5 AND x IS NOT NULL THEN x ELSE 0
925        let e = when(and(col("x").eq(lit(5)), col("x").is_not_null()), col("x"))
926            .otherwise(lit(0))?;
927        assert_not_nullable(&e, &nullable_schema);
928        assert_not_nullable(&e, &not_nullable_schema);
929
930        // CASE WHEN x IS NOT NULL OR x = 5 THEN x ELSE 0
931        let e = when(or(col("x").is_not_null(), col("x").eq(lit(5))), col("x"))
932            .otherwise(lit(0))?;
933        assert_not_nullable(&e, &nullable_schema);
934        assert_not_nullable(&e, &not_nullable_schema);
935
936        // CASE WHEN x = 5 OR x IS NOT NULL THEN x ELSE 0
937        let e = when(or(col("x").eq(lit(5)), col("x").is_not_null()), col("x"))
938            .otherwise(lit(0))?;
939        assert_not_nullable(&e, &nullable_schema);
940        assert_not_nullable(&e, &not_nullable_schema);
941
942        // CASE WHEN (x = 5 AND x IS NOT NULL) OR (x = bar AND x IS NOT NULL) THEN x ELSE 0
943        let e = when(
944            or(
945                and(col("x").eq(lit(5)), col("x").is_not_null()),
946                and(col("x").eq(col("bar")), col("x").is_not_null()),
947            ),
948            col("x"),
949        )
950        .otherwise(lit(0))?;
951        assert_not_nullable(&e, &nullable_schema);
952        assert_not_nullable(&e, &not_nullable_schema);
953
954        // CASE WHEN x = 5 OR x IS NULL THEN x ELSE 0
955        let e = when(or(col("x").eq(lit(5)), col("x").is_null()), col("x"))
956            .otherwise(lit(0))?;
957        assert_nullable(&e, &nullable_schema);
958        assert_not_nullable(&e, &not_nullable_schema);
959
960        // CASE WHEN x IS TRUE THEN x ELSE 0
961        let e = when(col("x").is_true(), col("x")).otherwise(lit(0))?;
962        assert_not_nullable(&e, &nullable_schema);
963        assert_not_nullable(&e, &not_nullable_schema);
964
965        // CASE WHEN x IS NOT TRUE THEN x ELSE 0
966        let e = when(col("x").is_not_true(), col("x")).otherwise(lit(0))?;
967        assert_nullable(&e, &nullable_schema);
968        assert_not_nullable(&e, &not_nullable_schema);
969
970        // CASE WHEN x IS FALSE THEN x ELSE 0
971        let e = when(col("x").is_false(), col("x")).otherwise(lit(0))?;
972        assert_not_nullable(&e, &nullable_schema);
973        assert_not_nullable(&e, &not_nullable_schema);
974
975        // CASE WHEN x IS NOT FALSE THEN x ELSE 0
976        let e = when(col("x").is_not_false(), col("x")).otherwise(lit(0))?;
977        assert_nullable(&e, &nullable_schema);
978        assert_not_nullable(&e, &not_nullable_schema);
979
980        // CASE WHEN x IS UNKNOWN THEN x ELSE 0
981        let e = when(col("x").is_unknown(), col("x")).otherwise(lit(0))?;
982        assert_nullable(&e, &nullable_schema);
983        assert_not_nullable(&e, &not_nullable_schema);
984
985        // CASE WHEN x IS NOT UNKNOWN THEN x ELSE 0
986        let e = when(col("x").is_not_unknown(), col("x")).otherwise(lit(0))?;
987        assert_not_nullable(&e, &nullable_schema);
988        assert_not_nullable(&e, &not_nullable_schema);
989
990        // CASE WHEN x LIKE 'x' THEN x ELSE 0
991        let e = when(col("x").like(lit("x")), col("x")).otherwise(lit(0))?;
992        assert_not_nullable(&e, &nullable_schema);
993        assert_not_nullable(&e, &not_nullable_schema);
994
995        // CASE WHEN 0 THEN x ELSE 0
996        let e = when(lit(0), col("x")).otherwise(lit(0))?;
997        assert_not_nullable(&e, &nullable_schema);
998        assert_not_nullable(&e, &not_nullable_schema);
999
1000        // CASE WHEN 1 THEN x ELSE 0
1001        let e = when(lit(1), col("x")).otherwise(lit(0))?;
1002        assert_nullable(&e, &nullable_schema);
1003        assert_not_nullable(&e, &not_nullable_schema);
1004
1005        Ok(())
1006    }
1007
1008    #[test]
1009    fn test_inlist_nullability() {
1010        let get_schema = |nullable| {
1011            MockExprSchema::new()
1012                .with_data_type(DataType::Int32)
1013                .with_nullable(nullable)
1014        };
1015
1016        let expr = col("foo").in_list(vec![lit(1); 5], false);
1017        assert!(!expr.nullable(&get_schema(false)).unwrap());
1018        assert!(expr.nullable(&get_schema(true)).unwrap());
1019        // Testing nullable() returns an error.
1020        assert!(
1021            expr.nullable(&get_schema(false).with_error_on_nullable(true))
1022                .is_err()
1023        );
1024
1025        let null = lit(ScalarValue::Int32(None));
1026        let expr = col("foo").in_list(vec![null, lit(1)], false);
1027        assert!(expr.nullable(&get_schema(false)).unwrap());
1028
1029        // Testing on long list
1030        let expr = col("foo").in_list(vec![lit(1); 6], false);
1031        assert!(expr.nullable(&get_schema(false)).unwrap());
1032    }
1033
1034    #[test]
1035    fn test_like_nullability() {
1036        let get_schema = |nullable| {
1037            MockExprSchema::new()
1038                .with_data_type(DataType::Utf8)
1039                .with_nullable(nullable)
1040        };
1041
1042        let expr = col("foo").like(lit("bar"));
1043        assert!(!expr.nullable(&get_schema(false)).unwrap());
1044        assert!(expr.nullable(&get_schema(true)).unwrap());
1045
1046        let expr = col("foo").like(lit(ScalarValue::Utf8(None)));
1047        assert!(expr.nullable(&get_schema(false)).unwrap());
1048    }
1049
1050    #[test]
1051    fn expr_schema_data_type() {
1052        let expr = col("foo");
1053        assert_eq!(
1054            DataType::Utf8,
1055            expr.get_type(&MockExprSchema::new().with_data_type(DataType::Utf8))
1056                .unwrap()
1057        );
1058    }
1059
1060    #[test]
1061    fn test_expr_metadata() {
1062        let mut meta = HashMap::new();
1063        meta.insert("bar".to_string(), "buzz".to_string());
1064        let meta = FieldMetadata::from(meta);
1065        let expr = col("foo");
1066        let schema = MockExprSchema::new()
1067            .with_data_type(DataType::Int32)
1068            .with_metadata(meta.clone());
1069
1070        // col, alias, and cast should be metadata-preserving
1071        assert_eq!(meta, expr.metadata(&schema).unwrap());
1072        assert_eq!(meta, expr.clone().alias("bar").metadata(&schema).unwrap());
1073        assert_eq!(
1074            meta,
1075            expr.clone()
1076                .cast_to(&DataType::Int64, &schema)
1077                .unwrap()
1078                .metadata(&schema)
1079                .unwrap()
1080        );
1081
1082        let schema = DFSchema::from_unqualified_fields(
1083            vec![meta.add_to_field(Field::new("foo", DataType::Int32, true))].into(),
1084            HashMap::new(),
1085        )
1086        .unwrap();
1087
1088        // verify to_field method populates metadata
1089        assert_eq!(meta, expr.metadata(&schema).unwrap());
1090
1091        // outer ref constructed by `out_ref_col_with_metadata` should be metadata-preserving
1092        let outer_ref = out_ref_col_with_metadata(
1093            DataType::Int32,
1094            meta.to_hashmap(),
1095            Column::from_name("foo"),
1096        );
1097        assert_eq!(meta, outer_ref.metadata(&schema).unwrap());
1098    }
1099
1100    #[test]
1101    fn test_expr_placeholder() {
1102        let schema = MockExprSchema::new();
1103
1104        let mut placeholder_meta = HashMap::new();
1105        placeholder_meta.insert("bar".to_string(), "buzz".to_string());
1106        let placeholder_meta = FieldMetadata::from(placeholder_meta);
1107
1108        let expr = Expr::Placeholder(Placeholder::new_with_field(
1109            "".to_string(),
1110            Some(
1111                Field::new("", DataType::Utf8, true)
1112                    .with_metadata(placeholder_meta.to_hashmap())
1113                    .into(),
1114            ),
1115        ));
1116
1117        let field = expr.to_field(&schema).unwrap().1;
1118        assert_eq!(
1119            (field.data_type(), field.is_nullable()),
1120            (&DataType::Utf8, true)
1121        );
1122        assert_eq!(placeholder_meta, expr.metadata(&schema).unwrap());
1123
1124        let expr_alias = expr.alias("a placeholder by any other name");
1125        let expr_alias_field = expr_alias.to_field(&schema).unwrap().1;
1126        assert_eq!(
1127            (expr_alias_field.data_type(), expr_alias_field.is_nullable()),
1128            (&DataType::Utf8, true)
1129        );
1130        assert_eq!(placeholder_meta, expr_alias.metadata(&schema).unwrap());
1131
1132        // Non-nullable placeholder field should remain non-nullable
1133        let expr = Expr::Placeholder(Placeholder::new_with_field(
1134            "".to_string(),
1135            Some(Field::new("", DataType::Utf8, false).into()),
1136        ));
1137        let expr_field = expr.to_field(&schema).unwrap().1;
1138        assert_eq!(
1139            (expr_field.data_type(), expr_field.is_nullable()),
1140            (&DataType::Utf8, false)
1141        );
1142
1143        let expr_alias = expr.alias("a placeholder by any other name");
1144        let expr_alias_field = expr_alias.to_field(&schema).unwrap().1;
1145        assert_eq!(
1146            (expr_alias_field.data_type(), expr_alias_field.is_nullable()),
1147            (&DataType::Utf8, false)
1148        );
1149    }
1150
1151    #[derive(Debug)]
1152    struct MockExprSchema {
1153        field: FieldRef,
1154        error_on_nullable: bool,
1155    }
1156
1157    impl MockExprSchema {
1158        fn new() -> Self {
1159            Self {
1160                field: Arc::new(Field::new("mock_field", DataType::Null, false)),
1161                error_on_nullable: false,
1162            }
1163        }
1164
1165        fn with_nullable(mut self, nullable: bool) -> Self {
1166            Arc::make_mut(&mut self.field).set_nullable(nullable);
1167            self
1168        }
1169
1170        fn with_data_type(mut self, data_type: DataType) -> Self {
1171            Arc::make_mut(&mut self.field).set_data_type(data_type);
1172            self
1173        }
1174
1175        fn with_error_on_nullable(mut self, error_on_nullable: bool) -> Self {
1176            self.error_on_nullable = error_on_nullable;
1177            self
1178        }
1179
1180        fn with_metadata(mut self, metadata: FieldMetadata) -> Self {
1181            self.field =
1182                Arc::new(metadata.add_to_field(Arc::unwrap_or_clone(self.field)));
1183            self
1184        }
1185    }
1186
1187    impl ExprSchema for MockExprSchema {
1188        fn nullable(&self, _col: &Column) -> Result<bool> {
1189            assert_or_internal_err!(!self.error_on_nullable, "nullable error");
1190            Ok(self.field.is_nullable())
1191        }
1192
1193        fn field_from_column(&self, _col: &Column) -> Result<&FieldRef> {
1194            Ok(&self.field)
1195        }
1196    }
1197
1198    #[test]
1199    fn test_scalar_variable() {
1200        let mut meta = HashMap::new();
1201        meta.insert("bar".to_string(), "buzz".to_string());
1202        let meta = FieldMetadata::from(meta);
1203
1204        let field = Field::new("foo", DataType::Int32, true);
1205        let field = meta.add_to_field(field);
1206        let field = Arc::new(field);
1207
1208        let expr = Expr::ScalarVariable(field, vec!["foo".to_string()]);
1209
1210        let schema = MockExprSchema::new();
1211
1212        assert_eq!(meta, expr.metadata(&schema).unwrap());
1213    }
1214}