Skip to main content

datafusion_expr/
expr_schema.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use super::{Between, Expr, Like, predicate_bounds};
19use crate::ValueOrLambda;
20use crate::expr::{
21    AggregateFunction, AggregateFunctionParams, Alias, BinaryExpr, Cast, InList,
22    InSubquery, Lambda, Placeholder, ScalarFunction, TryCast, Unnest, WindowFunction,
23    WindowFunctionParams,
24};
25use crate::expr::{FieldMetadata, LambdaVariable};
26use crate::higher_order_function::HigherOrderReturnFieldArgs;
27use crate::type_coercion::functions::value_fields_with_higher_order_udf_and_lambdas;
28use crate::type_coercion::functions::{UDFCoercionExt, fields_with_udf};
29use crate::udf::ReturnFieldArgs;
30use crate::{LogicalPlan, Projection, Subquery, WindowFunctionDefinition, utils};
31use arrow::compute::can_cast_types;
32use arrow::datatypes::FieldRef;
33use arrow::datatypes::{DataType, Field};
34use datafusion_common::datatype::FieldExt;
35use datafusion_common::{
36    Column, DataFusionError, ExprSchema, Result, ScalarValue, Spans, TableReference,
37    not_impl_err, plan_datafusion_err, plan_err,
38};
39use datafusion_expr_common::type_coercion::binary::BinaryTypeCoercer;
40use datafusion_functions_window_common::field::WindowUDFFieldArgs;
41use std::sync::Arc;
42
43/// Trait to allow expr to typable with respect to a schema
44pub trait ExprSchemable {
45    /// Given a schema, return the type of the expr
46    fn get_type(&self, schema: &dyn ExprSchema) -> Result<DataType>;
47
48    /// Given a schema, return the nullability of the expr
49    fn nullable(&self, input_schema: &dyn ExprSchema) -> Result<bool>;
50
51    /// Given a schema, return the expr's optional metadata
52    fn metadata(&self, schema: &dyn ExprSchema) -> Result<FieldMetadata>;
53
54    /// Convert to a field with respect to a schema
55    fn to_field(
56        &self,
57        input_schema: &dyn ExprSchema,
58    ) -> Result<(Option<TableReference>, Arc<Field>)>;
59
60    /// Cast to a type with respect to a schema
61    fn cast_to(self, cast_to_type: &DataType, schema: &dyn ExprSchema) -> Result<Expr>;
62
63    /// Given a schema, return the type and nullability of the expr
64    #[deprecated(
65        since = "51.0.0",
66        note = "Use `to_field().1.is_nullable` and `to_field().1.data_type()` directly instead"
67    )]
68    fn data_type_and_nullable(&self, schema: &dyn ExprSchema)
69    -> Result<(DataType, bool)>;
70}
71
72/// Derives the output field for a cast expression from the source field.
73/// For `TryCast`, `force_nullable` is `true` since a failed cast returns NULL.
74fn cast_output_field(
75    source_field: &FieldRef,
76    target_type: &DataType,
77    force_nullable: bool,
78) -> Arc<Field> {
79    let mut f = source_field
80        .as_ref()
81        .clone()
82        .with_data_type(target_type.clone())
83        .with_metadata(source_field.metadata().clone());
84    if force_nullable {
85        f = f.with_nullable(true);
86    }
87    Arc::new(f)
88}
89
90impl ExprSchemable for Expr {
91    /// Returns the [arrow::datatypes::DataType] of the expression
92    /// based on [ExprSchema]
93    ///
94    /// Note: [`DFSchema`] implements [ExprSchema].
95    ///
96    /// [`DFSchema`]: datafusion_common::DFSchema
97    ///
98    /// # Examples
99    ///
100    /// Get the type of an expression that adds 2 columns. Adding an Int32
101    /// and Float32 results in Float32 type
102    ///
103    /// ```
104    /// # use arrow::datatypes::{DataType, Field};
105    /// # use datafusion_common::DFSchema;
106    /// # use datafusion_expr::{col, ExprSchemable};
107    /// # use std::collections::HashMap;
108    ///
109    /// fn main() {
110    ///     let expr = col("c1") + col("c2");
111    ///     let schema = DFSchema::from_unqualified_fields(
112    ///         vec![
113    ///             Field::new("c1", DataType::Int32, true),
114    ///             Field::new("c2", DataType::Float32, true),
115    ///         ]
116    ///         .into(),
117    ///         HashMap::new(),
118    ///     )
119    ///     .unwrap();
120    ///     assert_eq!("Float32", format!("{}", expr.get_type(&schema).unwrap()));
121    /// }
122    /// ```
123    ///
124    /// # Errors
125    ///
126    /// This function errors when it is not possible to compute its
127    /// [arrow::datatypes::DataType].  This happens when e.g. the
128    /// expression refers to a column that does not exist in the
129    /// schema, or when the expression is incorrectly typed
130    /// (e.g. `[utf8] + [bool]`).
131    #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
132    fn get_type(&self, schema: &dyn ExprSchema) -> Result<DataType> {
133        match self {
134            Expr::Alias(Alias { expr, name, .. }) => match &**expr {
135                Expr::Placeholder(Placeholder { field, .. }) => match &field {
136                    None => schema.data_type(&Column::from_name(name)).cloned(),
137                    Some(field) => Ok(field.data_type().clone()),
138                },
139                _ => expr.get_type(schema),
140            },
141            Expr::Negative(expr) => expr.get_type(schema),
142            Expr::Column(c) => Ok(schema.data_type(c)?.clone()),
143            Expr::OuterReferenceColumn(field, _) => Ok(field.data_type().clone()),
144            Expr::ScalarVariable(field, _) => Ok(field.data_type().clone()),
145            Expr::Literal(l, _) => Ok(l.data_type()),
146            Expr::Case(case) => {
147                for (_, then_expr) in &case.when_then_expr {
148                    let then_type = then_expr.get_type(schema)?;
149                    if !then_type.is_null() {
150                        return Ok(then_type);
151                    }
152                }
153                case.else_expr
154                    .as_ref()
155                    .map_or(Ok(DataType::Null), |e| e.get_type(schema))
156            }
157            Expr::Cast(Cast { field, .. }) | Expr::TryCast(TryCast { field, .. }) => {
158                Ok(field.data_type().clone())
159            }
160            Expr::Unnest(Unnest { expr }) => {
161                let arg_data_type = expr.get_type(schema)?;
162                // Unnest's output type is the inner type of the list
163                match arg_data_type {
164                    DataType::List(field)
165                    | DataType::LargeList(field)
166                    | DataType::FixedSizeList(field, _)
167                    | DataType::ListView(field)
168                    | DataType::LargeListView(field) => Ok(field.data_type().clone()),
169                    DataType::Struct(_) => Ok(arg_data_type),
170                    DataType::Null => {
171                        not_impl_err!("unnest() does not support null yet")
172                    }
173                    _ => {
174                        plan_err!(
175                            "unnest() can only be applied to array, struct and null"
176                        )
177                    }
178                }
179            }
180            Expr::ScalarFunction(_)
181            | Expr::WindowFunction(_)
182            | Expr::AggregateFunction(_) => {
183                Ok(self.to_field(schema)?.1.data_type().clone())
184            }
185            Expr::Not(_)
186            | Expr::IsNull(_)
187            | Expr::Exists { .. }
188            | Expr::InSubquery(_)
189            | Expr::SetComparison(_)
190            | Expr::Between { .. }
191            | Expr::InList { .. }
192            | Expr::IsNotNull(_)
193            | Expr::IsTrue(_)
194            | Expr::IsFalse(_)
195            | Expr::IsUnknown(_)
196            | Expr::IsNotTrue(_)
197            | Expr::IsNotFalse(_)
198            | Expr::IsNotUnknown(_) => Ok(DataType::Boolean),
199            Expr::ScalarSubquery(subquery) => {
200                Ok(subquery.subquery.schema().field(0).data_type().clone())
201            }
202            Expr::BinaryExpr(BinaryExpr { left, right, op }) => BinaryTypeCoercer::new(
203                &left.get_type(schema)?,
204                op,
205                &right.get_type(schema)?,
206            )
207            .get_result_type(),
208            Expr::Like { .. } | Expr::SimilarTo { .. } => Ok(DataType::Boolean),
209            Expr::Placeholder(Placeholder { field, .. }) => {
210                if let Some(field) = field {
211                    Ok(field.data_type().clone())
212                } else {
213                    // If the placeholder's type hasn't been specified, treat it as
214                    // null (unspecified placeholders generate an error during planning)
215                    Ok(DataType::Null)
216                }
217            }
218            #[expect(deprecated)]
219            Expr::Wildcard { .. } => Ok(DataType::Null),
220            Expr::GroupingSet(_) => {
221                // Grouping sets do not really have a type and do not appear in projections
222                Ok(DataType::Null)
223            }
224            Expr::HigherOrderFunction(_func) => {
225                Ok(self.to_field(schema)?.1.data_type().clone())
226            }
227            Expr::Lambda(_lambda) => Ok(DataType::Null),
228            Expr::LambdaVariable(LambdaVariable { field, .. }) => match field {
229                Some(f) => Ok(f.data_type().clone()),
230                // If the lambda variable's field hasn't been specified, treat it as
231                // null (unspecified lambda variables generate an error during planning)
232                None => Ok(DataType::Null),
233            },
234        }
235    }
236
237    /// Returns the nullability of the expression based on [ExprSchema].
238    ///
239    /// Note: [`DFSchema`] implements [ExprSchema].
240    ///
241    /// [`DFSchema`]: datafusion_common::DFSchema
242    ///
243    /// # Errors
244    ///
245    /// This function errors when it is not possible to compute its
246    /// nullability.  This happens when the expression refers to a
247    /// column that does not exist in the schema.
248    fn nullable(&self, input_schema: &dyn ExprSchema) -> Result<bool> {
249        match self {
250            Expr::Alias(Alias { expr, .. }) | Expr::Not(expr) | Expr::Negative(expr) => {
251                expr.nullable(input_schema)
252            }
253
254            Expr::InList(InList { expr, list, .. }) => {
255                // Avoid inspecting too many expressions.
256                const MAX_INSPECT_LIMIT: usize = 6;
257                // Stop if a nullable expression is found or an error occurs.
258                let has_nullable = std::iter::once(expr.as_ref())
259                    .chain(list)
260                    .take(MAX_INSPECT_LIMIT)
261                    .find_map(|e| {
262                        e.nullable(input_schema)
263                            .map(|nullable| if nullable { Some(()) } else { None })
264                            .transpose()
265                    })
266                    .transpose()?;
267                Ok(match has_nullable {
268                    // If a nullable subexpression is found, the result may also be nullable.
269                    Some(_) => true,
270                    // If the list is too long, we assume it is nullable.
271                    None if list.len() + 1 > MAX_INSPECT_LIMIT => true,
272                    // All the subexpressions are non-nullable, so the result must be non-nullable.
273                    _ => false,
274                })
275            }
276
277            Expr::Between(Between {
278                expr, low, high, ..
279            }) => Ok(expr.nullable(input_schema)?
280                || low.nullable(input_schema)?
281                || high.nullable(input_schema)?),
282
283            Expr::Column(c) => input_schema.nullable(c),
284            Expr::OuterReferenceColumn(field, _) => Ok(field.is_nullable()),
285            Expr::Literal(value, _) => Ok(value.is_null()),
286            Expr::Case(case) => {
287                let nullable_then = case
288                    .when_then_expr
289                    .iter()
290                    .filter_map(|(w, t)| {
291                        let is_nullable = match t.nullable(input_schema) {
292                            Err(e) => return Some(Err(e)),
293                            Ok(n) => n,
294                        };
295
296                        // Branches with a then expression that is not nullable do not impact the
297                        // nullability of the case expression.
298                        if !is_nullable {
299                            return None;
300                        }
301
302                        // For case-with-expression assume all 'then' expressions are reachable
303                        if case.expr.is_some() {
304                            return Some(Ok(()));
305                        }
306
307                        // For branches with a nullable 'then' expression, try to determine
308                        // if the 'then' expression is ever reachable in the situation where
309                        // it would evaluate to null.
310                        let bounds = match predicate_bounds::evaluate_bounds(
311                            w,
312                            Some(unwrap_certainly_null_expr(t)),
313                            input_schema,
314                        ) {
315                            Err(e) => return Some(Err(e)),
316                            Ok(b) => b,
317                        };
318
319                        let can_be_true = match bounds
320                            .contains_value(ScalarValue::Boolean(Some(true)))
321                        {
322                            Err(e) => return Some(Err(e)),
323                            Ok(b) => b,
324                        };
325
326                        if !can_be_true {
327                            // If the derived 'when' expression can never evaluate to true, the
328                            // 'then' expression is not reachable when it would evaluate to NULL.
329                            // The most common pattern for this is `WHEN x IS NOT NULL THEN x`.
330                            None
331                        } else {
332                            // The branch might be taken
333                            Some(Ok(()))
334                        }
335                    })
336                    .next();
337
338                if let Some(nullable_then) = nullable_then {
339                    // There is at least one reachable nullable 'then' expression, so the case
340                    // expression itself is nullable.
341                    // Use `Result::map` to propagate the error from `nullable_then` if there is one.
342                    nullable_then.map(|_| true)
343                } else if let Some(e) = &case.else_expr {
344                    // There are no reachable nullable 'then' expressions, so all we still need to
345                    // check is the 'else' expression's nullability.
346                    e.nullable(input_schema)
347                } else {
348                    // CASE produces NULL if there is no `else` expr
349                    // (aka when none of the `when_then_exprs` match)
350                    Ok(true)
351                }
352            }
353            Expr::Cast(Cast { expr, .. }) => expr.nullable(input_schema),
354            Expr::ScalarFunction(_)
355            | Expr::AggregateFunction(_)
356            | Expr::WindowFunction(_) => Ok(self.to_field(input_schema)?.1.is_nullable()),
357            Expr::ScalarVariable(field, _) => Ok(field.is_nullable()),
358            Expr::TryCast { .. } | Expr::Unnest(_) | Expr::Placeholder(_) => Ok(true),
359            Expr::IsNull(_)
360            | Expr::IsNotNull(_)
361            | Expr::IsTrue(_)
362            | Expr::IsFalse(_)
363            | Expr::IsUnknown(_)
364            | Expr::IsNotTrue(_)
365            | Expr::IsNotFalse(_)
366            | Expr::IsNotUnknown(_)
367            | Expr::Exists { .. } => Ok(false),
368            Expr::SetComparison(_) => Ok(true),
369            Expr::InSubquery(InSubquery { expr, .. }) => expr.nullable(input_schema),
370            Expr::ScalarSubquery(subquery) => {
371                Ok(subquery.subquery.schema().field(0).is_nullable())
372            }
373            Expr::BinaryExpr(BinaryExpr { left, right, .. }) => {
374                Ok(left.nullable(input_schema)? || right.nullable(input_schema)?)
375            }
376            Expr::Like(Like { expr, pattern, .. })
377            | Expr::SimilarTo(Like { expr, pattern, .. }) => {
378                Ok(expr.nullable(input_schema)? || pattern.nullable(input_schema)?)
379            }
380            #[expect(deprecated)]
381            Expr::Wildcard { .. } => Ok(false),
382            Expr::GroupingSet(_) => {
383                // Grouping sets do not really have the concept of nullable and do not appear
384                // in projections
385                Ok(true)
386            }
387            Expr::HigherOrderFunction(_func) => {
388                Ok(self.to_field(input_schema)?.1.is_nullable())
389            }
390            Expr::Lambda(_lambda) => Ok(true),
391            Expr::LambdaVariable(LambdaVariable { field, .. }) => match field {
392                Some(f) => Ok(f.is_nullable()),
393                // If the lambda variable's field hasn't been specified, treat it as
394                // null (unspecified lambda variables generate an error during planning)
395                None => Ok(true),
396            },
397        }
398    }
399
400    fn metadata(&self, schema: &dyn ExprSchema) -> Result<FieldMetadata> {
401        self.to_field(schema)
402            .map(|(_, field)| FieldMetadata::from(field.metadata()))
403    }
404
405    /// Returns the datatype and nullability of the expression based on [ExprSchema].
406    ///
407    /// Note: [`DFSchema`] implements [ExprSchema].
408    ///
409    /// [`DFSchema`]: datafusion_common::DFSchema
410    ///
411    /// # Errors
412    ///
413    /// This function errors when it is not possible to compute its
414    /// datatype or nullability.
415    fn data_type_and_nullable(
416        &self,
417        schema: &dyn ExprSchema,
418    ) -> Result<(DataType, bool)> {
419        let field = self.to_field(schema)?.1;
420
421        Ok((field.data_type().clone(), field.is_nullable()))
422    }
423
424    /// Returns a [arrow::datatypes::Field] compatible with this expression.
425    ///
426    /// This function converts an expression into a field with appropriate metadata
427    /// and nullability based on the expression type and context. It is the primary
428    /// mechanism for determining field-level schemas.
429    ///
430    /// # Field Property Resolution
431    ///
432    /// For each expression, the following properties are determined:
433    ///
434    /// ## Data Type Resolution
435    /// - **Column references**: Data type from input schema field
436    /// - **Literals**: Data type inferred from literal value
437    /// - **Aliases**: Data type inherited from the underlying expression (the aliased expression)
438    /// - **Binary expressions**: Result type from type coercion rules
439    /// - **Boolean expressions**: Always a boolean type
440    /// - **Cast expressions**: Target data type from cast operation
441    /// - **Function calls**: Return type based on function signature and argument types
442    ///
443    /// ## Nullability Determination
444    /// - **Column references**: Inherit nullability from input schema field
445    /// - **Literals**: Nullable only if literal value is NULL
446    /// - **Aliases**: Inherit nullability from the underlying expression (the aliased expression)
447    /// - **Binary expressions**: Nullable if either operand is nullable
448    /// - **Boolean expressions**: Always non-nullable (IS NULL, EXISTS, etc.)
449    /// - **Cast expressions**: determined by the input expression's nullability rules
450    /// - **Function calls**: Based on function nullability rules and input nullability
451    ///
452    /// ## Metadata Handling
453    /// - **Column references**: Preserve original field metadata from input schema
454    /// - **Literals**: Use explicitly provided metadata, otherwise empty
455    /// - **Aliases**: Merge underlying expr metadata with alias-specific metadata, preferring the alias metadata
456    /// - **Binary expressions**: field metadata is empty
457    /// - **Boolean expressions**: field metadata is empty
458    /// - **Cast expressions**: determined by the input expression's field metadata handling
459    /// - **Scalar functions**: Generate metadata via function's [`return_field_from_args`] method,
460    ///   with the default implementation returning empty field metadata
461    /// - **Aggregate functions**: Generate metadata via function's [`return_field`] method,
462    ///   with the default implementation returning empty field metadata
463    /// - **Window functions**: field metadata follows the function's return field
464    ///
465    /// ## Table Reference Scoping
466    /// - Establishes proper qualified field references when columns belong to specific tables
467    /// - Maintains table context for accurate field resolution in multi-table scenarios
468    ///
469    /// So for example, a projected expression `col(c1) + col(c2)` is
470    /// placed in an output field **named** col("c1 + c2")
471    ///
472    /// [`return_field_from_args`]: crate::ScalarUDF::return_field_from_args
473    /// [`return_field`]: crate::AggregateUDF::return_field
474    fn to_field(
475        &self,
476        schema: &dyn ExprSchema,
477    ) -> Result<(Option<TableReference>, Arc<Field>)> {
478        let (relation, schema_name) = self.qualified_name();
479        #[expect(deprecated)]
480        let field = match self {
481            Expr::Alias(Alias {
482                expr,
483                name: _,
484                metadata,
485                ..
486            }) => {
487                let mut combined_metadata = expr.metadata(schema)?;
488                if let Some(metadata) = metadata {
489                    combined_metadata.extend(metadata.clone());
490                }
491
492                Ok(expr
493                    .to_field(schema)
494                    .map(|(_, f)| f)?
495                    .with_field_metadata(&combined_metadata))
496            }
497            Expr::Negative(expr) => expr.to_field(schema).map(|(_, f)| f),
498            Expr::Column(c) => schema.field_from_column(c).map(Arc::clone),
499            Expr::OuterReferenceColumn(field, _) => {
500                Ok(Arc::clone(field).renamed(&schema_name))
501            }
502            Expr::ScalarVariable(field, _) => Ok(Arc::clone(field).renamed(&schema_name)),
503            Expr::Literal(l, metadata) => Ok(Arc::new(
504                Field::new(&schema_name, l.data_type(), l.is_null())
505                    .with_field_metadata_opt(metadata.as_ref()),
506            )),
507            Expr::IsNull(_)
508            | Expr::IsNotNull(_)
509            | Expr::IsTrue(_)
510            | Expr::IsFalse(_)
511            | Expr::IsUnknown(_)
512            | Expr::IsNotTrue(_)
513            | Expr::IsNotFalse(_)
514            | Expr::IsNotUnknown(_)
515            | Expr::Exists { .. } => {
516                Ok(Arc::new(Field::new(&schema_name, DataType::Boolean, false)))
517            }
518            Expr::ScalarSubquery(subquery) => {
519                Ok(Arc::clone(&subquery.subquery.schema().fields()[0]))
520            }
521            Expr::BinaryExpr(BinaryExpr { left, right, op }) => {
522                let (left_field, right_field) =
523                    (left.to_field(schema)?.1, right.to_field(schema)?.1);
524
525                let (lhs_type, lhs_nullable) =
526                    (left_field.data_type(), left_field.is_nullable());
527                let (rhs_type, rhs_nullable) =
528                    (right_field.data_type(), right_field.is_nullable());
529                let mut coercer = BinaryTypeCoercer::new(lhs_type, op, rhs_type);
530                coercer.set_lhs_spans(left.spans().cloned().unwrap_or_default());
531                coercer.set_rhs_spans(right.spans().cloned().unwrap_or_default());
532                Ok(Arc::new(Field::new(
533                    &schema_name,
534                    coercer.get_result_type()?,
535                    lhs_nullable || rhs_nullable,
536                )))
537            }
538            Expr::WindowFunction(window_function) => {
539                let WindowFunction {
540                    fun,
541                    params: WindowFunctionParams { args, .. },
542                    ..
543                } = window_function.as_ref();
544
545                let fields = args
546                    .iter()
547                    .map(|e| e.to_field(schema).map(|(_, f)| f))
548                    .collect::<Result<Vec<_>>>()?;
549                match fun {
550                    WindowFunctionDefinition::AggregateUDF(udaf) => {
551                        let new_fields =
552                            verify_function_arguments(udaf.as_ref(), &fields)?;
553                        let return_field = udaf.return_field(&new_fields)?;
554                        Ok(return_field)
555                    }
556                    WindowFunctionDefinition::WindowUDF(udwf) => {
557                        let new_fields =
558                            verify_function_arguments(udwf.as_ref(), &fields)?;
559                        let return_field = udwf
560                            .field(WindowUDFFieldArgs::new(&new_fields, &schema_name))?;
561                        Ok(return_field)
562                    }
563                }
564            }
565            Expr::AggregateFunction(AggregateFunction {
566                func,
567                params: AggregateFunctionParams { args, .. },
568            }) => {
569                let fields = args
570                    .iter()
571                    .map(|e| e.to_field(schema).map(|(_, f)| f))
572                    .collect::<Result<Vec<_>>>()?;
573                let new_fields = verify_function_arguments(func.as_ref(), &fields)?;
574                func.return_field(&new_fields)
575            }
576            Expr::ScalarFunction(ScalarFunction { func, args }) => {
577                let fields = args
578                    .iter()
579                    .map(|e| e.to_field(schema).map(|(_, f)| f))
580                    .collect::<Result<Vec<_>>>()?;
581                let new_fields = verify_function_arguments(func.as_ref(), &fields)?;
582
583                let arguments = args
584                    .iter()
585                    .map(|e| match e {
586                        Expr::Literal(sv, _) => Some(sv),
587                        _ => None,
588                    })
589                    .collect::<Vec<_>>();
590                let args = ReturnFieldArgs {
591                    arg_fields: &new_fields,
592                    scalar_arguments: &arguments,
593                };
594
595                func.return_field_from_args(args)
596            }
597            // _ => Ok((self.get_type(schema)?, self.nullable(schema)?)),
598            Expr::Cast(Cast { expr, field }) => {
599                expr.to_field(schema).map(|(_table_ref, src)| {
600                    cast_output_field(&src, field.data_type(), false)
601                })
602            }
603            Expr::Placeholder(Placeholder {
604                id: _,
605                field: Some(field),
606            }) => Ok(Arc::clone(field).renamed(&schema_name)),
607            Expr::TryCast(TryCast { expr, field }) => {
608                expr.to_field(schema).map(|(_table_ref, src)| {
609                    cast_output_field(&src, field.data_type(), true)
610                })
611            }
612            Expr::LambdaVariable(LambdaVariable {
613                field: Some(field), ..
614            }) => Ok(Arc::clone(field).renamed(&schema_name)),
615            Expr::Like(_)
616            | Expr::SimilarTo(_)
617            | Expr::Not(_)
618            | Expr::Between(_)
619            | Expr::Case(_)
620            | Expr::InList(_)
621            | Expr::InSubquery(_)
622            | Expr::SetComparison(_)
623            | Expr::Wildcard { .. }
624            | Expr::GroupingSet(_)
625            | Expr::Placeholder(_)
626            | Expr::Unnest(_)
627            | Expr::Lambda(_)
628            | Expr::LambdaVariable(_) => Ok(Arc::new(Field::new(
629                &schema_name,
630                self.get_type(schema)?,
631                self.nullable(schema)?,
632            ))),
633            Expr::HigherOrderFunction(func) => {
634                let arg_fields = func
635                    .args
636                    .iter()
637                    .map(|arg| match arg {
638                        Expr::Lambda(Lambda { params: _, body }) => {
639                            // use the name of the lambda instead of just the body to help with debugging
640                            Ok(ValueOrLambda::Lambda(Arc::new(Field::new(
641                                arg.qualified_name().1,
642                                body.get_type(schema)?,
643                                body.nullable(schema)?,
644                            ))))
645                        }
646                        _ => Ok(ValueOrLambda::Value(arg.to_field(schema)?.1)),
647                    })
648                    .collect::<Result<Vec<_>>>()?;
649
650                let new_fields = value_fields_with_higher_order_udf_and_lambdas(
651                    &arg_fields,
652                    func.func.as_ref(),
653                )?;
654
655                let arguments = func
656                    .args
657                    .iter()
658                    .map(|e| match e {
659                        Expr::Literal(sv, _) => Some(sv),
660                        _ => None,
661                    })
662                    .collect::<Vec<_>>();
663
664                let args = HigherOrderReturnFieldArgs {
665                    arg_fields: &new_fields,
666                    scalar_arguments: &arguments,
667                };
668
669                func.func.return_field_from_args(args)
670            }
671        }?;
672
673        Ok((
674            relation,
675            // todo avoid this rename / use the name above
676            field.renamed(&schema_name),
677        ))
678    }
679
680    /// Wraps this expression in a cast to a target [arrow::datatypes::DataType].
681    ///
682    /// # Errors
683    ///
684    /// This function errors when it is impossible to cast the
685    /// expression to the target [arrow::datatypes::DataType].
686    fn cast_to(self, cast_to_type: &DataType, schema: &dyn ExprSchema) -> Result<Expr> {
687        let this_type = self.get_type(schema)?;
688        if this_type == *cast_to_type {
689            return Ok(self);
690        }
691
692        // TODO(kszucs): Most of the operations do not validate the type correctness
693        // like all of the binary expressions below. Perhaps Expr should track the
694        // type of the expression?
695
696        // Special handling for struct-to-struct casts with name-based field matching
697        let can_cast = match (&this_type, cast_to_type) {
698            (DataType::Struct(_), DataType::Struct(_)) => {
699                // Always allow struct-to-struct casts; field matching happens at runtime
700                true
701            }
702            _ => can_cast_types(&this_type, cast_to_type),
703        };
704
705        if can_cast {
706            match self {
707                Expr::ScalarSubquery(subquery) => {
708                    Ok(Expr::ScalarSubquery(cast_subquery(subquery, cast_to_type)?))
709                }
710                _ => Ok(Expr::Cast(Cast::new(Box::new(self), cast_to_type.clone()))),
711            }
712        } else {
713            plan_err!("Cannot automatically convert {this_type} to {cast_to_type}")
714        }
715    }
716}
717
718/// Verify that function is invoked with correct number and type of arguments as
719/// defined in `TypeSignature`.
720fn verify_function_arguments<F: UDFCoercionExt>(
721    function: &F,
722    input_fields: &[FieldRef],
723) -> Result<Vec<FieldRef>> {
724    fields_with_udf(input_fields, function).map_err(|err| {
725        let data_types = input_fields
726            .iter()
727            .map(|f| f.data_type())
728            .cloned()
729            .collect::<Vec<_>>();
730        plan_datafusion_err!(
731            "{}. {}",
732            match err {
733                DataFusionError::Plan(msg) => msg,
734                err => err.to_string(),
735            },
736            utils::generate_signature_error_message(
737                function.name(),
738                function.signature(),
739                &data_types
740            )
741        )
742    })
743}
744
745/// Returns the innermost [Expr] that is provably null if `expr` is null.
746fn unwrap_certainly_null_expr(expr: &Expr) -> &Expr {
747    match expr {
748        Expr::Not(e) => unwrap_certainly_null_expr(e),
749        Expr::Negative(e) => unwrap_certainly_null_expr(e),
750        Expr::Cast(e) => unwrap_certainly_null_expr(e.expr.as_ref()),
751        _ => expr,
752    }
753}
754
755/// Cast subquery in InSubquery/ScalarSubquery to a given type.
756///
757/// 1. **Projection plan**: If the subquery is a projection (i.e. a SELECT statement with specific
758///    columns), it casts the first expression in the projection to the target type and creates a
759///    new projection with the casted expression.
760/// 2. **Non-projection plan**: If the subquery isn't a projection, it adds a projection to the plan
761///    with the casted first column.
762pub fn cast_subquery(subquery: Subquery, cast_to_type: &DataType) -> Result<Subquery> {
763    if subquery.subquery.schema().field(0).data_type() == cast_to_type {
764        return Ok(subquery);
765    }
766
767    let plan = subquery.subquery.as_ref();
768    let new_plan = match plan {
769        LogicalPlan::Projection(projection) => {
770            let cast_expr = projection.expr[0]
771                .clone()
772                .cast_to(cast_to_type, projection.input.schema())?;
773            LogicalPlan::Projection(Projection::try_new(
774                vec![cast_expr],
775                Arc::clone(&projection.input),
776            )?)
777        }
778        _ => {
779            let cast_expr = Expr::Column(Column::from(plan.schema().qualified_field(0)))
780                .cast_to(cast_to_type, subquery.subquery.schema())?;
781            LogicalPlan::Projection(Projection::try_new(
782                vec![cast_expr],
783                subquery.subquery,
784            )?)
785        }
786    };
787    Ok(Subquery {
788        subquery: Arc::new(new_plan),
789        outer_ref_columns: subquery.outer_ref_columns,
790        spans: Spans::new(),
791    })
792}
793
794#[cfg(test)]
795mod tests {
796    use std::collections::HashMap;
797
798    use super::*;
799    use crate::{and, col, lit, not, or, out_ref_col_with_metadata, when};
800
801    use datafusion_common::{DFSchema, assert_or_internal_err};
802
803    macro_rules! test_is_expr_nullable {
804        ($EXPR_TYPE:ident) => {{
805            let expr = lit(ScalarValue::Null).$EXPR_TYPE();
806            assert!(!expr.nullable(&MockExprSchema::new()).unwrap());
807        }};
808    }
809
810    #[test]
811    fn expr_schema_nullability() {
812        let expr = col("foo").eq(lit(1));
813        assert!(!expr.nullable(&MockExprSchema::new()).unwrap());
814        assert!(
815            expr.nullable(&MockExprSchema::new().with_nullable(true))
816                .unwrap()
817        );
818
819        test_is_expr_nullable!(is_null);
820        test_is_expr_nullable!(is_not_null);
821        test_is_expr_nullable!(is_true);
822        test_is_expr_nullable!(is_not_true);
823        test_is_expr_nullable!(is_false);
824        test_is_expr_nullable!(is_not_false);
825        test_is_expr_nullable!(is_unknown);
826        test_is_expr_nullable!(is_not_unknown);
827    }
828
829    #[test]
830    fn test_between_nullability() {
831        let get_schema = |nullable| {
832            MockExprSchema::new()
833                .with_data_type(DataType::Int32)
834                .with_nullable(nullable)
835        };
836
837        let expr = col("foo").between(lit(1), lit(2));
838        assert!(!expr.nullable(&get_schema(false)).unwrap());
839        assert!(expr.nullable(&get_schema(true)).unwrap());
840
841        let null = lit(ScalarValue::Int32(None));
842
843        let expr = col("foo").between(null.clone(), lit(2));
844        assert!(expr.nullable(&get_schema(false)).unwrap());
845
846        let expr = col("foo").between(lit(1), null.clone());
847        assert!(expr.nullable(&get_schema(false)).unwrap());
848
849        let expr = col("foo").between(null.clone(), null);
850        assert!(expr.nullable(&get_schema(false)).unwrap());
851    }
852
853    fn assert_nullability(expr: &Expr, schema: &dyn ExprSchema, expected: bool) {
854        assert_eq!(
855            expr.nullable(schema).unwrap(),
856            expected,
857            "Nullability of '{expr}' should be {expected}"
858        );
859    }
860
861    fn assert_not_nullable(expr: &Expr, schema: &dyn ExprSchema) {
862        assert_nullability(expr, schema, false);
863    }
864
865    fn assert_nullable(expr: &Expr, schema: &dyn ExprSchema) {
866        assert_nullability(expr, schema, true);
867    }
868
869    #[test]
870    fn test_case_expression_nullability() -> Result<()> {
871        let nullable_schema = MockExprSchema::new()
872            .with_data_type(DataType::Int32)
873            .with_nullable(true);
874
875        let not_nullable_schema = MockExprSchema::new()
876            .with_data_type(DataType::Int32)
877            .with_nullable(false);
878
879        // CASE WHEN x IS NOT NULL THEN x ELSE 0
880        let e = when(col("x").is_not_null(), col("x")).otherwise(lit(0))?;
881        assert_not_nullable(&e, &nullable_schema);
882        assert_not_nullable(&e, &not_nullable_schema);
883
884        // CASE WHEN NOT x IS NULL THEN x ELSE 0
885        let e = when(not(col("x").is_null()), col("x")).otherwise(lit(0))?;
886        assert_not_nullable(&e, &nullable_schema);
887        assert_not_nullable(&e, &not_nullable_schema);
888
889        // CASE WHEN X = 5 THEN x ELSE 0
890        let e = when(col("x").eq(lit(5)), col("x")).otherwise(lit(0))?;
891        assert_not_nullable(&e, &nullable_schema);
892        assert_not_nullable(&e, &not_nullable_schema);
893
894        // CASE WHEN x IS NOT NULL AND x = 5 THEN x ELSE 0
895        let e = when(and(col("x").is_not_null(), col("x").eq(lit(5))), col("x"))
896            .otherwise(lit(0))?;
897        assert_not_nullable(&e, &nullable_schema);
898        assert_not_nullable(&e, &not_nullable_schema);
899
900        // CASE WHEN x = 5 AND x IS NOT NULL THEN x ELSE 0
901        let e = when(and(col("x").eq(lit(5)), col("x").is_not_null()), col("x"))
902            .otherwise(lit(0))?;
903        assert_not_nullable(&e, &nullable_schema);
904        assert_not_nullable(&e, &not_nullable_schema);
905
906        // CASE WHEN x IS NOT NULL OR x = 5 THEN x ELSE 0
907        let e = when(or(col("x").is_not_null(), col("x").eq(lit(5))), col("x"))
908            .otherwise(lit(0))?;
909        assert_not_nullable(&e, &nullable_schema);
910        assert_not_nullable(&e, &not_nullable_schema);
911
912        // CASE WHEN x = 5 OR x IS NOT NULL THEN x ELSE 0
913        let e = when(or(col("x").eq(lit(5)), col("x").is_not_null()), col("x"))
914            .otherwise(lit(0))?;
915        assert_not_nullable(&e, &nullable_schema);
916        assert_not_nullable(&e, &not_nullable_schema);
917
918        // CASE WHEN (x = 5 AND x IS NOT NULL) OR (x = bar AND x IS NOT NULL) THEN x ELSE 0
919        let e = when(
920            or(
921                and(col("x").eq(lit(5)), col("x").is_not_null()),
922                and(col("x").eq(col("bar")), col("x").is_not_null()),
923            ),
924            col("x"),
925        )
926        .otherwise(lit(0))?;
927        assert_not_nullable(&e, &nullable_schema);
928        assert_not_nullable(&e, &not_nullable_schema);
929
930        // CASE WHEN x = 5 OR x IS NULL THEN x ELSE 0
931        let e = when(or(col("x").eq(lit(5)), col("x").is_null()), col("x"))
932            .otherwise(lit(0))?;
933        assert_nullable(&e, &nullable_schema);
934        assert_not_nullable(&e, &not_nullable_schema);
935
936        // CASE WHEN x IS TRUE THEN x ELSE 0
937        let e = when(col("x").is_true(), col("x")).otherwise(lit(0))?;
938        assert_not_nullable(&e, &nullable_schema);
939        assert_not_nullable(&e, &not_nullable_schema);
940
941        // CASE WHEN x IS NOT TRUE THEN x ELSE 0
942        let e = when(col("x").is_not_true(), col("x")).otherwise(lit(0))?;
943        assert_nullable(&e, &nullable_schema);
944        assert_not_nullable(&e, &not_nullable_schema);
945
946        // CASE WHEN x IS FALSE THEN x ELSE 0
947        let e = when(col("x").is_false(), col("x")).otherwise(lit(0))?;
948        assert_not_nullable(&e, &nullable_schema);
949        assert_not_nullable(&e, &not_nullable_schema);
950
951        // CASE WHEN x IS NOT FALSE THEN x ELSE 0
952        let e = when(col("x").is_not_false(), col("x")).otherwise(lit(0))?;
953        assert_nullable(&e, &nullable_schema);
954        assert_not_nullable(&e, &not_nullable_schema);
955
956        // CASE WHEN x IS UNKNOWN THEN x ELSE 0
957        let e = when(col("x").is_unknown(), col("x")).otherwise(lit(0))?;
958        assert_nullable(&e, &nullable_schema);
959        assert_not_nullable(&e, &not_nullable_schema);
960
961        // CASE WHEN x IS NOT UNKNOWN THEN x ELSE 0
962        let e = when(col("x").is_not_unknown(), col("x")).otherwise(lit(0))?;
963        assert_not_nullable(&e, &nullable_schema);
964        assert_not_nullable(&e, &not_nullable_schema);
965
966        // CASE WHEN x LIKE 'x' THEN x ELSE 0
967        let e = when(col("x").like(lit("x")), col("x")).otherwise(lit(0))?;
968        assert_not_nullable(&e, &nullable_schema);
969        assert_not_nullable(&e, &not_nullable_schema);
970
971        // CASE WHEN 0 THEN x ELSE 0
972        let e = when(lit(0), col("x")).otherwise(lit(0))?;
973        assert_not_nullable(&e, &nullable_schema);
974        assert_not_nullable(&e, &not_nullable_schema);
975
976        // CASE WHEN 1 THEN x ELSE 0
977        let e = when(lit(1), col("x")).otherwise(lit(0))?;
978        assert_nullable(&e, &nullable_schema);
979        assert_not_nullable(&e, &not_nullable_schema);
980
981        Ok(())
982    }
983
984    #[test]
985    fn test_inlist_nullability() {
986        let get_schema = |nullable| {
987            MockExprSchema::new()
988                .with_data_type(DataType::Int32)
989                .with_nullable(nullable)
990        };
991
992        let expr = col("foo").in_list(vec![lit(1); 5], false);
993        assert!(!expr.nullable(&get_schema(false)).unwrap());
994        assert!(expr.nullable(&get_schema(true)).unwrap());
995        // Testing nullable() returns an error.
996        assert!(
997            expr.nullable(&get_schema(false).with_error_on_nullable(true))
998                .is_err()
999        );
1000
1001        let null = lit(ScalarValue::Int32(None));
1002        let expr = col("foo").in_list(vec![null, lit(1)], false);
1003        assert!(expr.nullable(&get_schema(false)).unwrap());
1004
1005        // Testing on long list
1006        let expr = col("foo").in_list(vec![lit(1); 6], false);
1007        assert!(expr.nullable(&get_schema(false)).unwrap());
1008    }
1009
1010    #[test]
1011    fn test_like_nullability() {
1012        let get_schema = |nullable| {
1013            MockExprSchema::new()
1014                .with_data_type(DataType::Utf8)
1015                .with_nullable(nullable)
1016        };
1017
1018        let expr = col("foo").like(lit("bar"));
1019        assert!(!expr.nullable(&get_schema(false)).unwrap());
1020        assert!(expr.nullable(&get_schema(true)).unwrap());
1021
1022        let expr = col("foo").like(lit(ScalarValue::Utf8(None)));
1023        assert!(expr.nullable(&get_schema(false)).unwrap());
1024    }
1025
1026    #[test]
1027    fn expr_schema_data_type() {
1028        let expr = col("foo");
1029        assert_eq!(
1030            DataType::Utf8,
1031            expr.get_type(&MockExprSchema::new().with_data_type(DataType::Utf8))
1032                .unwrap()
1033        );
1034    }
1035
1036    #[test]
1037    fn test_expr_metadata() {
1038        let mut meta = HashMap::new();
1039        meta.insert("bar".to_string(), "buzz".to_string());
1040        let meta = FieldMetadata::from(meta);
1041        let expr = col("foo");
1042        let schema = MockExprSchema::new()
1043            .with_data_type(DataType::Int32)
1044            .with_metadata(meta.clone());
1045
1046        // col, alias, and cast should be metadata-preserving
1047        assert_eq!(meta, expr.metadata(&schema).unwrap());
1048        assert_eq!(meta, expr.clone().alias("bar").metadata(&schema).unwrap());
1049        assert_eq!(
1050            meta,
1051            expr.clone()
1052                .cast_to(&DataType::Int64, &schema)
1053                .unwrap()
1054                .metadata(&schema)
1055                .unwrap()
1056        );
1057
1058        let schema = DFSchema::from_unqualified_fields(
1059            vec![meta.add_to_field(Field::new("foo", DataType::Int32, true))].into(),
1060            HashMap::new(),
1061        )
1062        .unwrap();
1063
1064        // verify to_field method populates metadata
1065        assert_eq!(meta, expr.metadata(&schema).unwrap());
1066
1067        // outer ref constructed by `out_ref_col_with_metadata` should be metadata-preserving
1068        let outer_ref = out_ref_col_with_metadata(
1069            DataType::Int32,
1070            meta.to_hashmap(),
1071            Column::from_name("foo"),
1072        );
1073        assert_eq!(meta, outer_ref.metadata(&schema).unwrap());
1074    }
1075
1076    #[test]
1077    fn test_alias_metadata_is_preserved_in_field_metadata() {
1078        let schema = MockExprSchema::new().with_data_type(DataType::Int32);
1079        let alias_metadata = FieldMetadata::from(HashMap::from([(
1080            "some_key".to_string(),
1081            "some_value".to_string(),
1082        )]));
1083
1084        let Expr::Alias(alias) = col("foo").alias("alias") else {
1085            unreachable!();
1086        };
1087        let expr = Expr::Alias(alias.with_metadata(Some(alias_metadata.clone())));
1088
1089        let field = expr.to_field(&schema).unwrap().1;
1090        assert_eq!(
1091            field.metadata().get("some_key"),
1092            Some(&"some_value".to_string())
1093        );
1094        assert_eq!(expr.metadata(&schema).unwrap(), alias_metadata);
1095    }
1096
1097    #[test]
1098    fn test_expr_placeholder() {
1099        let schema = MockExprSchema::new();
1100
1101        let mut placeholder_meta = HashMap::new();
1102        placeholder_meta.insert("bar".to_string(), "buzz".to_string());
1103        let placeholder_meta = FieldMetadata::from(placeholder_meta);
1104
1105        let expr = Expr::Placeholder(Placeholder::new_with_field(
1106            "".to_string(),
1107            Some(
1108                Field::new("", DataType::Utf8, true)
1109                    .with_metadata(placeholder_meta.to_hashmap())
1110                    .into(),
1111            ),
1112        ));
1113
1114        let field = expr.to_field(&schema).unwrap().1;
1115        assert_eq!(
1116            (field.data_type(), field.is_nullable()),
1117            (&DataType::Utf8, true)
1118        );
1119        assert_eq!(placeholder_meta, expr.metadata(&schema).unwrap());
1120
1121        let expr_alias = expr.alias("a placeholder by any other name");
1122        let expr_alias_field = expr_alias.to_field(&schema).unwrap().1;
1123        assert_eq!(
1124            (expr_alias_field.data_type(), expr_alias_field.is_nullable()),
1125            (&DataType::Utf8, true)
1126        );
1127        assert_eq!(placeholder_meta, expr_alias.metadata(&schema).unwrap());
1128
1129        // Non-nullable placeholder field should remain non-nullable
1130        let expr = Expr::Placeholder(Placeholder::new_with_field(
1131            "".to_string(),
1132            Some(Field::new("", DataType::Utf8, false).into()),
1133        ));
1134        let expr_field = expr.to_field(&schema).unwrap().1;
1135        assert_eq!(
1136            (expr_field.data_type(), expr_field.is_nullable()),
1137            (&DataType::Utf8, false)
1138        );
1139
1140        let expr_alias = expr.alias("a placeholder by any other name");
1141        let expr_alias_field = expr_alias.to_field(&schema).unwrap().1;
1142        assert_eq!(
1143            (expr_alias_field.data_type(), expr_alias_field.is_nullable()),
1144            (&DataType::Utf8, false)
1145        );
1146    }
1147
1148    #[derive(Debug)]
1149    struct MockExprSchema {
1150        field: FieldRef,
1151        error_on_nullable: bool,
1152    }
1153
1154    impl MockExprSchema {
1155        fn new() -> Self {
1156            Self {
1157                field: Arc::new(Field::new("mock_field", DataType::Null, false)),
1158                error_on_nullable: false,
1159            }
1160        }
1161
1162        fn with_nullable(mut self, nullable: bool) -> Self {
1163            Arc::make_mut(&mut self.field).set_nullable(nullable);
1164            self
1165        }
1166
1167        fn with_data_type(mut self, data_type: DataType) -> Self {
1168            Arc::make_mut(&mut self.field).set_data_type(data_type);
1169            self
1170        }
1171
1172        fn with_error_on_nullable(mut self, error_on_nullable: bool) -> Self {
1173            self.error_on_nullable = error_on_nullable;
1174            self
1175        }
1176
1177        fn with_metadata(mut self, metadata: FieldMetadata) -> Self {
1178            self.field =
1179                Arc::new(metadata.add_to_field(Arc::unwrap_or_clone(self.field)));
1180            self
1181        }
1182    }
1183
1184    impl ExprSchema for MockExprSchema {
1185        fn nullable(&self, _col: &Column) -> Result<bool> {
1186            assert_or_internal_err!(!self.error_on_nullable, "nullable error");
1187            Ok(self.field.is_nullable())
1188        }
1189
1190        fn field_from_column(&self, _col: &Column) -> Result<&FieldRef> {
1191            Ok(&self.field)
1192        }
1193    }
1194
1195    #[test]
1196    fn test_scalar_variable() {
1197        let mut meta = HashMap::new();
1198        meta.insert("bar".to_string(), "buzz".to_string());
1199        let meta = FieldMetadata::from(meta);
1200
1201        let field = Field::new("foo", DataType::Int32, true);
1202        let field = meta.add_to_field(field);
1203        let field = Arc::new(field);
1204
1205        let expr = Expr::ScalarVariable(field, vec!["foo".to_string()]);
1206
1207        let schema = MockExprSchema::new();
1208
1209        assert_eq!(meta, expr.metadata(&schema).unwrap());
1210    }
1211}