1use super::{Between, Expr, Like, predicate_bounds};
19use crate::ValueOrLambda;
20use crate::expr::{
21 AggregateFunction, AggregateFunctionParams, Alias, BinaryExpr, Cast, InList,
22 InSubquery, Lambda, Placeholder, ScalarFunction, TryCast, Unnest, WindowFunction,
23 WindowFunctionParams,
24};
25use crate::expr::{FieldMetadata, LambdaVariable};
26use crate::higher_order_function::HigherOrderReturnFieldArgs;
27use crate::type_coercion::functions::value_fields_with_higher_order_udf_and_lambdas;
28use crate::type_coercion::functions::{UDFCoercionExt, fields_with_udf};
29use crate::udf::ReturnFieldArgs;
30use crate::{LogicalPlan, Projection, Subquery, WindowFunctionDefinition, utils};
31use arrow::compute::can_cast_types;
32use arrow::datatypes::FieldRef;
33use arrow::datatypes::{DataType, Field};
34use datafusion_common::datatype::FieldExt;
35use datafusion_common::{
36 Column, DataFusionError, ExprSchema, Result, ScalarValue, Spans, TableReference,
37 not_impl_err, plan_datafusion_err, plan_err,
38};
39use datafusion_expr_common::type_coercion::binary::BinaryTypeCoercer;
40use datafusion_functions_window_common::field::WindowUDFFieldArgs;
41use std::sync::Arc;
42
43pub trait ExprSchemable {
45 fn get_type(&self, schema: &dyn ExprSchema) -> Result<DataType>;
47
48 fn nullable(&self, input_schema: &dyn ExprSchema) -> Result<bool>;
50
51 fn metadata(&self, schema: &dyn ExprSchema) -> Result<FieldMetadata>;
53
54 fn to_field(
56 &self,
57 input_schema: &dyn ExprSchema,
58 ) -> Result<(Option<TableReference>, Arc<Field>)>;
59
60 fn cast_to(self, cast_to_type: &DataType, schema: &dyn ExprSchema) -> Result<Expr>;
62
63 #[deprecated(
65 since = "51.0.0",
66 note = "Use `to_field().1.is_nullable` and `to_field().1.data_type()` directly instead"
67 )]
68 fn data_type_and_nullable(&self, schema: &dyn ExprSchema)
69 -> Result<(DataType, bool)>;
70}
71
72fn cast_output_field(
75 source_field: &FieldRef,
76 target_type: &DataType,
77 force_nullable: bool,
78) -> Arc<Field> {
79 let mut f = source_field
80 .as_ref()
81 .clone()
82 .with_data_type(target_type.clone())
83 .with_metadata(source_field.metadata().clone());
84 if force_nullable {
85 f = f.with_nullable(true);
86 }
87 Arc::new(f)
88}
89
90impl ExprSchemable for Expr {
91 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
132 fn get_type(&self, schema: &dyn ExprSchema) -> Result<DataType> {
133 match self {
134 Expr::Alias(Alias { expr, name, .. }) => match &**expr {
135 Expr::Placeholder(Placeholder { field, .. }) => match &field {
136 None => schema.data_type(&Column::from_name(name)).cloned(),
137 Some(field) => Ok(field.data_type().clone()),
138 },
139 _ => expr.get_type(schema),
140 },
141 Expr::Negative(expr) => expr.get_type(schema),
142 Expr::Column(c) => Ok(schema.data_type(c)?.clone()),
143 Expr::OuterReferenceColumn(field, _) => Ok(field.data_type().clone()),
144 Expr::ScalarVariable(field, _) => Ok(field.data_type().clone()),
145 Expr::Literal(l, _) => Ok(l.data_type()),
146 Expr::Case(case) => {
147 for (_, then_expr) in &case.when_then_expr {
148 let then_type = then_expr.get_type(schema)?;
149 if !then_type.is_null() {
150 return Ok(then_type);
151 }
152 }
153 case.else_expr
154 .as_ref()
155 .map_or(Ok(DataType::Null), |e| e.get_type(schema))
156 }
157 Expr::Cast(Cast { field, .. }) | Expr::TryCast(TryCast { field, .. }) => {
158 Ok(field.data_type().clone())
159 }
160 Expr::Unnest(Unnest { expr }) => {
161 let arg_data_type = expr.get_type(schema)?;
162 match arg_data_type {
164 DataType::List(field)
165 | DataType::LargeList(field)
166 | DataType::FixedSizeList(field, _)
167 | DataType::ListView(field)
168 | DataType::LargeListView(field) => Ok(field.data_type().clone()),
169 DataType::Struct(_) => Ok(arg_data_type),
170 DataType::Null => {
171 not_impl_err!("unnest() does not support null yet")
172 }
173 _ => {
174 plan_err!(
175 "unnest() can only be applied to array, struct and null"
176 )
177 }
178 }
179 }
180 Expr::ScalarFunction(_)
181 | Expr::WindowFunction(_)
182 | Expr::AggregateFunction(_) => {
183 Ok(self.to_field(schema)?.1.data_type().clone())
184 }
185 Expr::Not(_)
186 | Expr::IsNull(_)
187 | Expr::Exists { .. }
188 | Expr::InSubquery(_)
189 | Expr::SetComparison(_)
190 | Expr::Between { .. }
191 | Expr::InList { .. }
192 | Expr::IsNotNull(_)
193 | Expr::IsTrue(_)
194 | Expr::IsFalse(_)
195 | Expr::IsUnknown(_)
196 | Expr::IsNotTrue(_)
197 | Expr::IsNotFalse(_)
198 | Expr::IsNotUnknown(_) => Ok(DataType::Boolean),
199 Expr::ScalarSubquery(subquery) => {
200 Ok(subquery.subquery.schema().field(0).data_type().clone())
201 }
202 Expr::BinaryExpr(BinaryExpr { left, right, op }) => BinaryTypeCoercer::new(
203 &left.get_type(schema)?,
204 op,
205 &right.get_type(schema)?,
206 )
207 .get_result_type(),
208 Expr::Like { .. } | Expr::SimilarTo { .. } => Ok(DataType::Boolean),
209 Expr::Placeholder(Placeholder { field, .. }) => {
210 if let Some(field) = field {
211 Ok(field.data_type().clone())
212 } else {
213 Ok(DataType::Null)
216 }
217 }
218 #[expect(deprecated)]
219 Expr::Wildcard { .. } => Ok(DataType::Null),
220 Expr::GroupingSet(_) => {
221 Ok(DataType::Null)
223 }
224 Expr::HigherOrderFunction(_func) => {
225 Ok(self.to_field(schema)?.1.data_type().clone())
226 }
227 Expr::Lambda(_lambda) => Ok(DataType::Null),
228 Expr::LambdaVariable(LambdaVariable { field, .. }) => match field {
229 Some(f) => Ok(f.data_type().clone()),
230 None => Ok(DataType::Null),
233 },
234 }
235 }
236
237 fn nullable(&self, input_schema: &dyn ExprSchema) -> Result<bool> {
249 match self {
250 Expr::Alias(Alias { expr, .. }) | Expr::Not(expr) | Expr::Negative(expr) => {
251 expr.nullable(input_schema)
252 }
253
254 Expr::InList(InList { expr, list, .. }) => {
255 const MAX_INSPECT_LIMIT: usize = 6;
257 let has_nullable = std::iter::once(expr.as_ref())
259 .chain(list)
260 .take(MAX_INSPECT_LIMIT)
261 .find_map(|e| {
262 e.nullable(input_schema)
263 .map(|nullable| if nullable { Some(()) } else { None })
264 .transpose()
265 })
266 .transpose()?;
267 Ok(match has_nullable {
268 Some(_) => true,
270 None if list.len() + 1 > MAX_INSPECT_LIMIT => true,
272 _ => false,
274 })
275 }
276
277 Expr::Between(Between {
278 expr, low, high, ..
279 }) => Ok(expr.nullable(input_schema)?
280 || low.nullable(input_schema)?
281 || high.nullable(input_schema)?),
282
283 Expr::Column(c) => input_schema.nullable(c),
284 Expr::OuterReferenceColumn(field, _) => Ok(field.is_nullable()),
285 Expr::Literal(value, _) => Ok(value.is_null()),
286 Expr::Case(case) => {
287 let nullable_then = case
288 .when_then_expr
289 .iter()
290 .filter_map(|(w, t)| {
291 let is_nullable = match t.nullable(input_schema) {
292 Err(e) => return Some(Err(e)),
293 Ok(n) => n,
294 };
295
296 if !is_nullable {
299 return None;
300 }
301
302 if case.expr.is_some() {
304 return Some(Ok(()));
305 }
306
307 let bounds = match predicate_bounds::evaluate_bounds(
311 w,
312 Some(unwrap_certainly_null_expr(t)),
313 input_schema,
314 ) {
315 Err(e) => return Some(Err(e)),
316 Ok(b) => b,
317 };
318
319 let can_be_true = match bounds
320 .contains_value(ScalarValue::Boolean(Some(true)))
321 {
322 Err(e) => return Some(Err(e)),
323 Ok(b) => b,
324 };
325
326 if !can_be_true {
327 None
331 } else {
332 Some(Ok(()))
334 }
335 })
336 .next();
337
338 if let Some(nullable_then) = nullable_then {
339 nullable_then.map(|_| true)
343 } else if let Some(e) = &case.else_expr {
344 e.nullable(input_schema)
347 } else {
348 Ok(true)
351 }
352 }
353 Expr::Cast(Cast { expr, .. }) => expr.nullable(input_schema),
354 Expr::ScalarFunction(_)
355 | Expr::AggregateFunction(_)
356 | Expr::WindowFunction(_) => Ok(self.to_field(input_schema)?.1.is_nullable()),
357 Expr::ScalarVariable(field, _) => Ok(field.is_nullable()),
358 Expr::TryCast { .. } | Expr::Unnest(_) | Expr::Placeholder(_) => Ok(true),
359 Expr::IsNull(_)
360 | Expr::IsNotNull(_)
361 | Expr::IsTrue(_)
362 | Expr::IsFalse(_)
363 | Expr::IsUnknown(_)
364 | Expr::IsNotTrue(_)
365 | Expr::IsNotFalse(_)
366 | Expr::IsNotUnknown(_)
367 | Expr::Exists { .. } => Ok(false),
368 Expr::SetComparison(_) => Ok(true),
369 Expr::InSubquery(InSubquery { expr, .. }) => expr.nullable(input_schema),
370 Expr::ScalarSubquery(subquery) => {
371 Ok(subquery.subquery.schema().field(0).is_nullable())
372 }
373 Expr::BinaryExpr(BinaryExpr { left, right, .. }) => {
374 Ok(left.nullable(input_schema)? || right.nullable(input_schema)?)
375 }
376 Expr::Like(Like { expr, pattern, .. })
377 | Expr::SimilarTo(Like { expr, pattern, .. }) => {
378 Ok(expr.nullable(input_schema)? || pattern.nullable(input_schema)?)
379 }
380 #[expect(deprecated)]
381 Expr::Wildcard { .. } => Ok(false),
382 Expr::GroupingSet(_) => {
383 Ok(true)
386 }
387 Expr::HigherOrderFunction(_func) => {
388 Ok(self.to_field(input_schema)?.1.is_nullable())
389 }
390 Expr::Lambda(_lambda) => Ok(true),
391 Expr::LambdaVariable(LambdaVariable { field, .. }) => match field {
392 Some(f) => Ok(f.is_nullable()),
393 None => Ok(true),
396 },
397 }
398 }
399
400 fn metadata(&self, schema: &dyn ExprSchema) -> Result<FieldMetadata> {
401 self.to_field(schema)
402 .map(|(_, field)| FieldMetadata::from(field.metadata()))
403 }
404
405 fn data_type_and_nullable(
416 &self,
417 schema: &dyn ExprSchema,
418 ) -> Result<(DataType, bool)> {
419 let field = self.to_field(schema)?.1;
420
421 Ok((field.data_type().clone(), field.is_nullable()))
422 }
423
424 fn to_field(
475 &self,
476 schema: &dyn ExprSchema,
477 ) -> Result<(Option<TableReference>, Arc<Field>)> {
478 let (relation, schema_name) = self.qualified_name();
479 #[expect(deprecated)]
480 let field = match self {
481 Expr::Alias(Alias {
482 expr,
483 name: _,
484 metadata,
485 ..
486 }) => {
487 let mut combined_metadata = expr.metadata(schema)?;
488 if let Some(metadata) = metadata {
489 combined_metadata.extend(metadata.clone());
490 }
491
492 Ok(expr
493 .to_field(schema)
494 .map(|(_, f)| f)?
495 .with_field_metadata(&combined_metadata))
496 }
497 Expr::Negative(expr) => expr.to_field(schema).map(|(_, f)| f),
498 Expr::Column(c) => schema.field_from_column(c).map(Arc::clone),
499 Expr::OuterReferenceColumn(field, _) => {
500 Ok(Arc::clone(field).renamed(&schema_name))
501 }
502 Expr::ScalarVariable(field, _) => Ok(Arc::clone(field).renamed(&schema_name)),
503 Expr::Literal(l, metadata) => Ok(Arc::new(
504 Field::new(&schema_name, l.data_type(), l.is_null())
505 .with_field_metadata_opt(metadata.as_ref()),
506 )),
507 Expr::IsNull(_)
508 | Expr::IsNotNull(_)
509 | Expr::IsTrue(_)
510 | Expr::IsFalse(_)
511 | Expr::IsUnknown(_)
512 | Expr::IsNotTrue(_)
513 | Expr::IsNotFalse(_)
514 | Expr::IsNotUnknown(_)
515 | Expr::Exists { .. } => {
516 Ok(Arc::new(Field::new(&schema_name, DataType::Boolean, false)))
517 }
518 Expr::ScalarSubquery(subquery) => {
519 Ok(Arc::clone(&subquery.subquery.schema().fields()[0]))
520 }
521 Expr::BinaryExpr(BinaryExpr { left, right, op }) => {
522 let (left_field, right_field) =
523 (left.to_field(schema)?.1, right.to_field(schema)?.1);
524
525 let (lhs_type, lhs_nullable) =
526 (left_field.data_type(), left_field.is_nullable());
527 let (rhs_type, rhs_nullable) =
528 (right_field.data_type(), right_field.is_nullable());
529 let mut coercer = BinaryTypeCoercer::new(lhs_type, op, rhs_type);
530 coercer.set_lhs_spans(left.spans().cloned().unwrap_or_default());
531 coercer.set_rhs_spans(right.spans().cloned().unwrap_or_default());
532 Ok(Arc::new(Field::new(
533 &schema_name,
534 coercer.get_result_type()?,
535 lhs_nullable || rhs_nullable,
536 )))
537 }
538 Expr::WindowFunction(window_function) => {
539 let WindowFunction {
540 fun,
541 params: WindowFunctionParams { args, .. },
542 ..
543 } = window_function.as_ref();
544
545 let fields = args
546 .iter()
547 .map(|e| e.to_field(schema).map(|(_, f)| f))
548 .collect::<Result<Vec<_>>>()?;
549 match fun {
550 WindowFunctionDefinition::AggregateUDF(udaf) => {
551 let new_fields =
552 verify_function_arguments(udaf.as_ref(), &fields)?;
553 let return_field = udaf.return_field(&new_fields)?;
554 Ok(return_field)
555 }
556 WindowFunctionDefinition::WindowUDF(udwf) => {
557 let new_fields =
558 verify_function_arguments(udwf.as_ref(), &fields)?;
559 let return_field = udwf
560 .field(WindowUDFFieldArgs::new(&new_fields, &schema_name))?;
561 Ok(return_field)
562 }
563 }
564 }
565 Expr::AggregateFunction(AggregateFunction {
566 func,
567 params: AggregateFunctionParams { args, .. },
568 }) => {
569 let fields = args
570 .iter()
571 .map(|e| e.to_field(schema).map(|(_, f)| f))
572 .collect::<Result<Vec<_>>>()?;
573 let new_fields = verify_function_arguments(func.as_ref(), &fields)?;
574 func.return_field(&new_fields)
575 }
576 Expr::ScalarFunction(ScalarFunction { func, args }) => {
577 let fields = args
578 .iter()
579 .map(|e| e.to_field(schema).map(|(_, f)| f))
580 .collect::<Result<Vec<_>>>()?;
581 let new_fields = verify_function_arguments(func.as_ref(), &fields)?;
582
583 let arguments = args
584 .iter()
585 .map(|e| match e {
586 Expr::Literal(sv, _) => Some(sv),
587 _ => None,
588 })
589 .collect::<Vec<_>>();
590 let args = ReturnFieldArgs {
591 arg_fields: &new_fields,
592 scalar_arguments: &arguments,
593 };
594
595 func.return_field_from_args(args)
596 }
597 Expr::Cast(Cast { expr, field }) => {
599 expr.to_field(schema).map(|(_table_ref, src)| {
600 cast_output_field(&src, field.data_type(), false)
601 })
602 }
603 Expr::Placeholder(Placeholder {
604 id: _,
605 field: Some(field),
606 }) => Ok(Arc::clone(field).renamed(&schema_name)),
607 Expr::TryCast(TryCast { expr, field }) => {
608 expr.to_field(schema).map(|(_table_ref, src)| {
609 cast_output_field(&src, field.data_type(), true)
610 })
611 }
612 Expr::LambdaVariable(LambdaVariable {
613 field: Some(field), ..
614 }) => Ok(Arc::clone(field).renamed(&schema_name)),
615 Expr::Like(_)
616 | Expr::SimilarTo(_)
617 | Expr::Not(_)
618 | Expr::Between(_)
619 | Expr::Case(_)
620 | Expr::InList(_)
621 | Expr::InSubquery(_)
622 | Expr::SetComparison(_)
623 | Expr::Wildcard { .. }
624 | Expr::GroupingSet(_)
625 | Expr::Placeholder(_)
626 | Expr::Unnest(_)
627 | Expr::Lambda(_)
628 | Expr::LambdaVariable(_) => Ok(Arc::new(Field::new(
629 &schema_name,
630 self.get_type(schema)?,
631 self.nullable(schema)?,
632 ))),
633 Expr::HigherOrderFunction(func) => {
634 let arg_fields = func
635 .args
636 .iter()
637 .map(|arg| match arg {
638 Expr::Lambda(Lambda { params: _, body }) => {
639 Ok(ValueOrLambda::Lambda(Arc::new(Field::new(
641 arg.qualified_name().1,
642 body.get_type(schema)?,
643 body.nullable(schema)?,
644 ))))
645 }
646 _ => Ok(ValueOrLambda::Value(arg.to_field(schema)?.1)),
647 })
648 .collect::<Result<Vec<_>>>()?;
649
650 let new_fields = value_fields_with_higher_order_udf_and_lambdas(
651 &arg_fields,
652 func.func.as_ref(),
653 )?;
654
655 let arguments = func
656 .args
657 .iter()
658 .map(|e| match e {
659 Expr::Literal(sv, _) => Some(sv),
660 _ => None,
661 })
662 .collect::<Vec<_>>();
663
664 let args = HigherOrderReturnFieldArgs {
665 arg_fields: &new_fields,
666 scalar_arguments: &arguments,
667 };
668
669 func.func.return_field_from_args(args)
670 }
671 }?;
672
673 Ok((
674 relation,
675 field.renamed(&schema_name),
677 ))
678 }
679
680 fn cast_to(self, cast_to_type: &DataType, schema: &dyn ExprSchema) -> Result<Expr> {
687 let this_type = self.get_type(schema)?;
688 if this_type == *cast_to_type {
689 return Ok(self);
690 }
691
692 let can_cast = match (&this_type, cast_to_type) {
698 (DataType::Struct(_), DataType::Struct(_)) => {
699 true
701 }
702 _ => can_cast_types(&this_type, cast_to_type),
703 };
704
705 if can_cast {
706 match self {
707 Expr::ScalarSubquery(subquery) => {
708 Ok(Expr::ScalarSubquery(cast_subquery(subquery, cast_to_type)?))
709 }
710 _ => Ok(Expr::Cast(Cast::new(Box::new(self), cast_to_type.clone()))),
711 }
712 } else {
713 plan_err!("Cannot automatically convert {this_type} to {cast_to_type}")
714 }
715 }
716}
717
718fn verify_function_arguments<F: UDFCoercionExt>(
721 function: &F,
722 input_fields: &[FieldRef],
723) -> Result<Vec<FieldRef>> {
724 fields_with_udf(input_fields, function).map_err(|err| {
725 let data_types = input_fields
726 .iter()
727 .map(|f| f.data_type())
728 .cloned()
729 .collect::<Vec<_>>();
730 plan_datafusion_err!(
731 "{}. {}",
732 match err {
733 DataFusionError::Plan(msg) => msg,
734 err => err.to_string(),
735 },
736 utils::generate_signature_error_message(
737 function.name(),
738 function.signature(),
739 &data_types
740 )
741 )
742 })
743}
744
745fn unwrap_certainly_null_expr(expr: &Expr) -> &Expr {
747 match expr {
748 Expr::Not(e) => unwrap_certainly_null_expr(e),
749 Expr::Negative(e) => unwrap_certainly_null_expr(e),
750 Expr::Cast(e) => unwrap_certainly_null_expr(e.expr.as_ref()),
751 _ => expr,
752 }
753}
754
755pub fn cast_subquery(subquery: Subquery, cast_to_type: &DataType) -> Result<Subquery> {
763 if subquery.subquery.schema().field(0).data_type() == cast_to_type {
764 return Ok(subquery);
765 }
766
767 let plan = subquery.subquery.as_ref();
768 let new_plan = match plan {
769 LogicalPlan::Projection(projection) => {
770 let cast_expr = projection.expr[0]
771 .clone()
772 .cast_to(cast_to_type, projection.input.schema())?;
773 LogicalPlan::Projection(Projection::try_new(
774 vec![cast_expr],
775 Arc::clone(&projection.input),
776 )?)
777 }
778 _ => {
779 let cast_expr = Expr::Column(Column::from(plan.schema().qualified_field(0)))
780 .cast_to(cast_to_type, subquery.subquery.schema())?;
781 LogicalPlan::Projection(Projection::try_new(
782 vec![cast_expr],
783 subquery.subquery,
784 )?)
785 }
786 };
787 Ok(Subquery {
788 subquery: Arc::new(new_plan),
789 outer_ref_columns: subquery.outer_ref_columns,
790 spans: Spans::new(),
791 })
792}
793
794#[cfg(test)]
795mod tests {
796 use std::collections::HashMap;
797
798 use super::*;
799 use crate::{and, col, lit, not, or, out_ref_col_with_metadata, when};
800
801 use datafusion_common::{DFSchema, assert_or_internal_err};
802
803 macro_rules! test_is_expr_nullable {
804 ($EXPR_TYPE:ident) => {{
805 let expr = lit(ScalarValue::Null).$EXPR_TYPE();
806 assert!(!expr.nullable(&MockExprSchema::new()).unwrap());
807 }};
808 }
809
810 #[test]
811 fn expr_schema_nullability() {
812 let expr = col("foo").eq(lit(1));
813 assert!(!expr.nullable(&MockExprSchema::new()).unwrap());
814 assert!(
815 expr.nullable(&MockExprSchema::new().with_nullable(true))
816 .unwrap()
817 );
818
819 test_is_expr_nullable!(is_null);
820 test_is_expr_nullable!(is_not_null);
821 test_is_expr_nullable!(is_true);
822 test_is_expr_nullable!(is_not_true);
823 test_is_expr_nullable!(is_false);
824 test_is_expr_nullable!(is_not_false);
825 test_is_expr_nullable!(is_unknown);
826 test_is_expr_nullable!(is_not_unknown);
827 }
828
829 #[test]
830 fn test_between_nullability() {
831 let get_schema = |nullable| {
832 MockExprSchema::new()
833 .with_data_type(DataType::Int32)
834 .with_nullable(nullable)
835 };
836
837 let expr = col("foo").between(lit(1), lit(2));
838 assert!(!expr.nullable(&get_schema(false)).unwrap());
839 assert!(expr.nullable(&get_schema(true)).unwrap());
840
841 let null = lit(ScalarValue::Int32(None));
842
843 let expr = col("foo").between(null.clone(), lit(2));
844 assert!(expr.nullable(&get_schema(false)).unwrap());
845
846 let expr = col("foo").between(lit(1), null.clone());
847 assert!(expr.nullable(&get_schema(false)).unwrap());
848
849 let expr = col("foo").between(null.clone(), null);
850 assert!(expr.nullable(&get_schema(false)).unwrap());
851 }
852
853 fn assert_nullability(expr: &Expr, schema: &dyn ExprSchema, expected: bool) {
854 assert_eq!(
855 expr.nullable(schema).unwrap(),
856 expected,
857 "Nullability of '{expr}' should be {expected}"
858 );
859 }
860
861 fn assert_not_nullable(expr: &Expr, schema: &dyn ExprSchema) {
862 assert_nullability(expr, schema, false);
863 }
864
865 fn assert_nullable(expr: &Expr, schema: &dyn ExprSchema) {
866 assert_nullability(expr, schema, true);
867 }
868
869 #[test]
870 fn test_case_expression_nullability() -> Result<()> {
871 let nullable_schema = MockExprSchema::new()
872 .with_data_type(DataType::Int32)
873 .with_nullable(true);
874
875 let not_nullable_schema = MockExprSchema::new()
876 .with_data_type(DataType::Int32)
877 .with_nullable(false);
878
879 let e = when(col("x").is_not_null(), col("x")).otherwise(lit(0))?;
881 assert_not_nullable(&e, &nullable_schema);
882 assert_not_nullable(&e, ¬_nullable_schema);
883
884 let e = when(not(col("x").is_null()), col("x")).otherwise(lit(0))?;
886 assert_not_nullable(&e, &nullable_schema);
887 assert_not_nullable(&e, ¬_nullable_schema);
888
889 let e = when(col("x").eq(lit(5)), col("x")).otherwise(lit(0))?;
891 assert_not_nullable(&e, &nullable_schema);
892 assert_not_nullable(&e, ¬_nullable_schema);
893
894 let e = when(and(col("x").is_not_null(), col("x").eq(lit(5))), col("x"))
896 .otherwise(lit(0))?;
897 assert_not_nullable(&e, &nullable_schema);
898 assert_not_nullable(&e, ¬_nullable_schema);
899
900 let e = when(and(col("x").eq(lit(5)), col("x").is_not_null()), col("x"))
902 .otherwise(lit(0))?;
903 assert_not_nullable(&e, &nullable_schema);
904 assert_not_nullable(&e, ¬_nullable_schema);
905
906 let e = when(or(col("x").is_not_null(), col("x").eq(lit(5))), col("x"))
908 .otherwise(lit(0))?;
909 assert_not_nullable(&e, &nullable_schema);
910 assert_not_nullable(&e, ¬_nullable_schema);
911
912 let e = when(or(col("x").eq(lit(5)), col("x").is_not_null()), col("x"))
914 .otherwise(lit(0))?;
915 assert_not_nullable(&e, &nullable_schema);
916 assert_not_nullable(&e, ¬_nullable_schema);
917
918 let e = when(
920 or(
921 and(col("x").eq(lit(5)), col("x").is_not_null()),
922 and(col("x").eq(col("bar")), col("x").is_not_null()),
923 ),
924 col("x"),
925 )
926 .otherwise(lit(0))?;
927 assert_not_nullable(&e, &nullable_schema);
928 assert_not_nullable(&e, ¬_nullable_schema);
929
930 let e = when(or(col("x").eq(lit(5)), col("x").is_null()), col("x"))
932 .otherwise(lit(0))?;
933 assert_nullable(&e, &nullable_schema);
934 assert_not_nullable(&e, ¬_nullable_schema);
935
936 let e = when(col("x").is_true(), col("x")).otherwise(lit(0))?;
938 assert_not_nullable(&e, &nullable_schema);
939 assert_not_nullable(&e, ¬_nullable_schema);
940
941 let e = when(col("x").is_not_true(), col("x")).otherwise(lit(0))?;
943 assert_nullable(&e, &nullable_schema);
944 assert_not_nullable(&e, ¬_nullable_schema);
945
946 let e = when(col("x").is_false(), col("x")).otherwise(lit(0))?;
948 assert_not_nullable(&e, &nullable_schema);
949 assert_not_nullable(&e, ¬_nullable_schema);
950
951 let e = when(col("x").is_not_false(), col("x")).otherwise(lit(0))?;
953 assert_nullable(&e, &nullable_schema);
954 assert_not_nullable(&e, ¬_nullable_schema);
955
956 let e = when(col("x").is_unknown(), col("x")).otherwise(lit(0))?;
958 assert_nullable(&e, &nullable_schema);
959 assert_not_nullable(&e, ¬_nullable_schema);
960
961 let e = when(col("x").is_not_unknown(), col("x")).otherwise(lit(0))?;
963 assert_not_nullable(&e, &nullable_schema);
964 assert_not_nullable(&e, ¬_nullable_schema);
965
966 let e = when(col("x").like(lit("x")), col("x")).otherwise(lit(0))?;
968 assert_not_nullable(&e, &nullable_schema);
969 assert_not_nullable(&e, ¬_nullable_schema);
970
971 let e = when(lit(0), col("x")).otherwise(lit(0))?;
973 assert_not_nullable(&e, &nullable_schema);
974 assert_not_nullable(&e, ¬_nullable_schema);
975
976 let e = when(lit(1), col("x")).otherwise(lit(0))?;
978 assert_nullable(&e, &nullable_schema);
979 assert_not_nullable(&e, ¬_nullable_schema);
980
981 Ok(())
982 }
983
984 #[test]
985 fn test_inlist_nullability() {
986 let get_schema = |nullable| {
987 MockExprSchema::new()
988 .with_data_type(DataType::Int32)
989 .with_nullable(nullable)
990 };
991
992 let expr = col("foo").in_list(vec![lit(1); 5], false);
993 assert!(!expr.nullable(&get_schema(false)).unwrap());
994 assert!(expr.nullable(&get_schema(true)).unwrap());
995 assert!(
997 expr.nullable(&get_schema(false).with_error_on_nullable(true))
998 .is_err()
999 );
1000
1001 let null = lit(ScalarValue::Int32(None));
1002 let expr = col("foo").in_list(vec![null, lit(1)], false);
1003 assert!(expr.nullable(&get_schema(false)).unwrap());
1004
1005 let expr = col("foo").in_list(vec![lit(1); 6], false);
1007 assert!(expr.nullable(&get_schema(false)).unwrap());
1008 }
1009
1010 #[test]
1011 fn test_like_nullability() {
1012 let get_schema = |nullable| {
1013 MockExprSchema::new()
1014 .with_data_type(DataType::Utf8)
1015 .with_nullable(nullable)
1016 };
1017
1018 let expr = col("foo").like(lit("bar"));
1019 assert!(!expr.nullable(&get_schema(false)).unwrap());
1020 assert!(expr.nullable(&get_schema(true)).unwrap());
1021
1022 let expr = col("foo").like(lit(ScalarValue::Utf8(None)));
1023 assert!(expr.nullable(&get_schema(false)).unwrap());
1024 }
1025
1026 #[test]
1027 fn expr_schema_data_type() {
1028 let expr = col("foo");
1029 assert_eq!(
1030 DataType::Utf8,
1031 expr.get_type(&MockExprSchema::new().with_data_type(DataType::Utf8))
1032 .unwrap()
1033 );
1034 }
1035
1036 #[test]
1037 fn test_expr_metadata() {
1038 let mut meta = HashMap::new();
1039 meta.insert("bar".to_string(), "buzz".to_string());
1040 let meta = FieldMetadata::from(meta);
1041 let expr = col("foo");
1042 let schema = MockExprSchema::new()
1043 .with_data_type(DataType::Int32)
1044 .with_metadata(meta.clone());
1045
1046 assert_eq!(meta, expr.metadata(&schema).unwrap());
1048 assert_eq!(meta, expr.clone().alias("bar").metadata(&schema).unwrap());
1049 assert_eq!(
1050 meta,
1051 expr.clone()
1052 .cast_to(&DataType::Int64, &schema)
1053 .unwrap()
1054 .metadata(&schema)
1055 .unwrap()
1056 );
1057
1058 let schema = DFSchema::from_unqualified_fields(
1059 vec![meta.add_to_field(Field::new("foo", DataType::Int32, true))].into(),
1060 HashMap::new(),
1061 )
1062 .unwrap();
1063
1064 assert_eq!(meta, expr.metadata(&schema).unwrap());
1066
1067 let outer_ref = out_ref_col_with_metadata(
1069 DataType::Int32,
1070 meta.to_hashmap(),
1071 Column::from_name("foo"),
1072 );
1073 assert_eq!(meta, outer_ref.metadata(&schema).unwrap());
1074 }
1075
1076 #[test]
1077 fn test_alias_metadata_is_preserved_in_field_metadata() {
1078 let schema = MockExprSchema::new().with_data_type(DataType::Int32);
1079 let alias_metadata = FieldMetadata::from(HashMap::from([(
1080 "some_key".to_string(),
1081 "some_value".to_string(),
1082 )]));
1083
1084 let Expr::Alias(alias) = col("foo").alias("alias") else {
1085 unreachable!();
1086 };
1087 let expr = Expr::Alias(alias.with_metadata(Some(alias_metadata.clone())));
1088
1089 let field = expr.to_field(&schema).unwrap().1;
1090 assert_eq!(
1091 field.metadata().get("some_key"),
1092 Some(&"some_value".to_string())
1093 );
1094 assert_eq!(expr.metadata(&schema).unwrap(), alias_metadata);
1095 }
1096
1097 #[test]
1098 fn test_expr_placeholder() {
1099 let schema = MockExprSchema::new();
1100
1101 let mut placeholder_meta = HashMap::new();
1102 placeholder_meta.insert("bar".to_string(), "buzz".to_string());
1103 let placeholder_meta = FieldMetadata::from(placeholder_meta);
1104
1105 let expr = Expr::Placeholder(Placeholder::new_with_field(
1106 "".to_string(),
1107 Some(
1108 Field::new("", DataType::Utf8, true)
1109 .with_metadata(placeholder_meta.to_hashmap())
1110 .into(),
1111 ),
1112 ));
1113
1114 let field = expr.to_field(&schema).unwrap().1;
1115 assert_eq!(
1116 (field.data_type(), field.is_nullable()),
1117 (&DataType::Utf8, true)
1118 );
1119 assert_eq!(placeholder_meta, expr.metadata(&schema).unwrap());
1120
1121 let expr_alias = expr.alias("a placeholder by any other name");
1122 let expr_alias_field = expr_alias.to_field(&schema).unwrap().1;
1123 assert_eq!(
1124 (expr_alias_field.data_type(), expr_alias_field.is_nullable()),
1125 (&DataType::Utf8, true)
1126 );
1127 assert_eq!(placeholder_meta, expr_alias.metadata(&schema).unwrap());
1128
1129 let expr = Expr::Placeholder(Placeholder::new_with_field(
1131 "".to_string(),
1132 Some(Field::new("", DataType::Utf8, false).into()),
1133 ));
1134 let expr_field = expr.to_field(&schema).unwrap().1;
1135 assert_eq!(
1136 (expr_field.data_type(), expr_field.is_nullable()),
1137 (&DataType::Utf8, false)
1138 );
1139
1140 let expr_alias = expr.alias("a placeholder by any other name");
1141 let expr_alias_field = expr_alias.to_field(&schema).unwrap().1;
1142 assert_eq!(
1143 (expr_alias_field.data_type(), expr_alias_field.is_nullable()),
1144 (&DataType::Utf8, false)
1145 );
1146 }
1147
1148 #[derive(Debug)]
1149 struct MockExprSchema {
1150 field: FieldRef,
1151 error_on_nullable: bool,
1152 }
1153
1154 impl MockExprSchema {
1155 fn new() -> Self {
1156 Self {
1157 field: Arc::new(Field::new("mock_field", DataType::Null, false)),
1158 error_on_nullable: false,
1159 }
1160 }
1161
1162 fn with_nullable(mut self, nullable: bool) -> Self {
1163 Arc::make_mut(&mut self.field).set_nullable(nullable);
1164 self
1165 }
1166
1167 fn with_data_type(mut self, data_type: DataType) -> Self {
1168 Arc::make_mut(&mut self.field).set_data_type(data_type);
1169 self
1170 }
1171
1172 fn with_error_on_nullable(mut self, error_on_nullable: bool) -> Self {
1173 self.error_on_nullable = error_on_nullable;
1174 self
1175 }
1176
1177 fn with_metadata(mut self, metadata: FieldMetadata) -> Self {
1178 self.field =
1179 Arc::new(metadata.add_to_field(Arc::unwrap_or_clone(self.field)));
1180 self
1181 }
1182 }
1183
1184 impl ExprSchema for MockExprSchema {
1185 fn nullable(&self, _col: &Column) -> Result<bool> {
1186 assert_or_internal_err!(!self.error_on_nullable, "nullable error");
1187 Ok(self.field.is_nullable())
1188 }
1189
1190 fn field_from_column(&self, _col: &Column) -> Result<&FieldRef> {
1191 Ok(&self.field)
1192 }
1193 }
1194
1195 #[test]
1196 fn test_scalar_variable() {
1197 let mut meta = HashMap::new();
1198 meta.insert("bar".to_string(), "buzz".to_string());
1199 let meta = FieldMetadata::from(meta);
1200
1201 let field = Field::new("foo", DataType::Int32, true);
1202 let field = meta.add_to_field(field);
1203 let field = Arc::new(field);
1204
1205 let expr = Expr::ScalarVariable(field, vec!["foo".to_string()]);
1206
1207 let schema = MockExprSchema::new();
1208
1209 assert_eq!(meta, expr.metadata(&schema).unwrap());
1210 }
1211}