1use super::{Between, Expr, Like, predicate_bounds};
19use crate::expr::{
20 AggregateFunction, AggregateFunctionParams, Alias, BinaryExpr, Cast, InList,
21 InSubquery, Placeholder, ScalarFunction, TryCast, Unnest, WindowFunction,
22 WindowFunctionParams,
23};
24use crate::type_coercion::functions::fields_with_udf;
25use crate::udf::ReturnFieldArgs;
26use crate::{LogicalPlan, Projection, Subquery, WindowFunctionDefinition, utils};
27use arrow::compute::can_cast_types;
28use arrow::datatypes::{DataType, Field};
29use datafusion_common::datatype::FieldExt;
30use datafusion_common::metadata::FieldMetadata;
31use datafusion_common::{
32 Column, DataFusionError, ExprSchema, Result, ScalarValue, Spans, TableReference,
33 not_impl_err, plan_datafusion_err, plan_err,
34};
35use datafusion_expr_common::type_coercion::binary::BinaryTypeCoercer;
36use datafusion_functions_window_common::field::WindowUDFFieldArgs;
37use std::sync::Arc;
38
39pub trait ExprSchemable {
41 fn get_type(&self, schema: &dyn ExprSchema) -> Result<DataType>;
43
44 fn nullable(&self, input_schema: &dyn ExprSchema) -> Result<bool>;
46
47 fn metadata(&self, schema: &dyn ExprSchema) -> Result<FieldMetadata>;
49
50 fn to_field(
52 &self,
53 input_schema: &dyn ExprSchema,
54 ) -> Result<(Option<TableReference>, Arc<Field>)>;
55
56 fn cast_to(self, cast_to_type: &DataType, schema: &dyn ExprSchema) -> Result<Expr>;
58
59 #[deprecated(
61 since = "51.0.0",
62 note = "Use `to_field().1.is_nullable` and `to_field().1.data_type()` directly instead"
63 )]
64 fn data_type_and_nullable(&self, schema: &dyn ExprSchema)
65 -> Result<(DataType, bool)>;
66}
67
68impl ExprSchemable for Expr {
69 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
110 fn get_type(&self, schema: &dyn ExprSchema) -> Result<DataType> {
111 match self {
112 Expr::Alias(Alias { expr, name, .. }) => match &**expr {
113 Expr::Placeholder(Placeholder { field, .. }) => match &field {
114 None => schema.data_type(&Column::from_name(name)).cloned(),
115 Some(field) => Ok(field.data_type().clone()),
116 },
117 _ => expr.get_type(schema),
118 },
119 Expr::Negative(expr) => expr.get_type(schema),
120 Expr::Column(c) => Ok(schema.data_type(c)?.clone()),
121 Expr::OuterReferenceColumn(field, _) => Ok(field.data_type().clone()),
122 Expr::ScalarVariable(field, _) => Ok(field.data_type().clone()),
123 Expr::Literal(l, _) => Ok(l.data_type()),
124 Expr::Case(case) => {
125 for (_, then_expr) in &case.when_then_expr {
126 let then_type = then_expr.get_type(schema)?;
127 if !then_type.is_null() {
128 return Ok(then_type);
129 }
130 }
131 case.else_expr
132 .as_ref()
133 .map_or(Ok(DataType::Null), |e| e.get_type(schema))
134 }
135 Expr::Cast(Cast { data_type, .. })
136 | Expr::TryCast(TryCast { data_type, .. }) => Ok(data_type.clone()),
137 Expr::Unnest(Unnest { expr }) => {
138 let arg_data_type = expr.get_type(schema)?;
139 match arg_data_type {
141 DataType::List(field)
142 | DataType::LargeList(field)
143 | DataType::FixedSizeList(field, _) => Ok(field.data_type().clone()),
144 DataType::Struct(_) => Ok(arg_data_type),
145 DataType::Null => {
146 not_impl_err!("unnest() does not support null yet")
147 }
148 _ => {
149 plan_err!(
150 "unnest() can only be applied to array, struct and null"
151 )
152 }
153 }
154 }
155 Expr::ScalarFunction(_func) => {
156 let return_type = self.to_field(schema)?.1.data_type().clone();
157 Ok(return_type)
158 }
159 Expr::WindowFunction(window_function) => self
160 .data_type_and_nullable_with_window_function(schema, window_function)
161 .map(|(return_type, _)| return_type),
162 Expr::AggregateFunction(AggregateFunction {
163 func,
164 params: AggregateFunctionParams { args, .. },
165 }) => {
166 let fields = args
167 .iter()
168 .map(|e| e.to_field(schema).map(|(_, f)| f))
169 .collect::<Result<Vec<_>>>()?;
170 let new_fields = fields_with_udf(&fields, func.as_ref())
171 .map_err(|err| {
172 let data_types = fields
173 .iter()
174 .map(|f| f.data_type().clone())
175 .collect::<Vec<_>>();
176 plan_datafusion_err!(
177 "{} {}",
178 match err {
179 DataFusionError::Plan(msg) => msg,
180 err => err.to_string(),
181 },
182 utils::generate_signature_error_msg(
183 func.name(),
184 func.signature().clone(),
185 &data_types
186 )
187 )
188 })?
189 .into_iter()
190 .collect::<Vec<_>>();
191 Ok(func.return_field(&new_fields)?.data_type().clone())
192 }
193 Expr::Not(_)
194 | Expr::IsNull(_)
195 | Expr::Exists { .. }
196 | Expr::InSubquery(_)
197 | Expr::Between { .. }
198 | Expr::InList { .. }
199 | Expr::IsNotNull(_)
200 | Expr::IsTrue(_)
201 | Expr::IsFalse(_)
202 | Expr::IsUnknown(_)
203 | Expr::IsNotTrue(_)
204 | Expr::IsNotFalse(_)
205 | Expr::IsNotUnknown(_) => Ok(DataType::Boolean),
206 Expr::ScalarSubquery(subquery) => {
207 Ok(subquery.subquery.schema().field(0).data_type().clone())
208 }
209 Expr::BinaryExpr(BinaryExpr { left, right, op }) => BinaryTypeCoercer::new(
210 &left.get_type(schema)?,
211 op,
212 &right.get_type(schema)?,
213 )
214 .get_result_type(),
215 Expr::Like { .. } | Expr::SimilarTo { .. } => Ok(DataType::Boolean),
216 Expr::Placeholder(Placeholder { field, .. }) => {
217 if let Some(field) = field {
218 Ok(field.data_type().clone())
219 } else {
220 Ok(DataType::Null)
223 }
224 }
225 #[expect(deprecated)]
226 Expr::Wildcard { .. } => Ok(DataType::Null),
227 Expr::GroupingSet(_) => {
228 Ok(DataType::Null)
230 }
231 }
232 }
233
234 fn nullable(&self, input_schema: &dyn ExprSchema) -> Result<bool> {
246 match self {
247 Expr::Alias(Alias { expr, .. }) | Expr::Not(expr) | Expr::Negative(expr) => {
248 expr.nullable(input_schema)
249 }
250
251 Expr::InList(InList { expr, list, .. }) => {
252 const MAX_INSPECT_LIMIT: usize = 6;
254 let has_nullable = std::iter::once(expr.as_ref())
256 .chain(list)
257 .take(MAX_INSPECT_LIMIT)
258 .find_map(|e| {
259 e.nullable(input_schema)
260 .map(|nullable| if nullable { Some(()) } else { None })
261 .transpose()
262 })
263 .transpose()?;
264 Ok(match has_nullable {
265 Some(_) => true,
267 None if list.len() + 1 > MAX_INSPECT_LIMIT => true,
269 _ => false,
271 })
272 }
273
274 Expr::Between(Between {
275 expr, low, high, ..
276 }) => Ok(expr.nullable(input_schema)?
277 || low.nullable(input_schema)?
278 || high.nullable(input_schema)?),
279
280 Expr::Column(c) => input_schema.nullable(c),
281 Expr::OuterReferenceColumn(field, _) => Ok(field.is_nullable()),
282 Expr::Literal(value, _) => Ok(value.is_null()),
283 Expr::Case(case) => {
284 let nullable_then = case
285 .when_then_expr
286 .iter()
287 .filter_map(|(w, t)| {
288 let is_nullable = match t.nullable(input_schema) {
289 Err(e) => return Some(Err(e)),
290 Ok(n) => n,
291 };
292
293 if !is_nullable {
296 return None;
297 }
298
299 if case.expr.is_some() {
301 return Some(Ok(()));
302 }
303
304 let bounds = match predicate_bounds::evaluate_bounds(
308 w,
309 Some(unwrap_certainly_null_expr(t)),
310 input_schema,
311 ) {
312 Err(e) => return Some(Err(e)),
313 Ok(b) => b,
314 };
315
316 let can_be_true = match bounds
317 .contains_value(ScalarValue::Boolean(Some(true)))
318 {
319 Err(e) => return Some(Err(e)),
320 Ok(b) => b,
321 };
322
323 if !can_be_true {
324 None
328 } else {
329 Some(Ok(()))
331 }
332 })
333 .next();
334
335 if let Some(nullable_then) = nullable_then {
336 nullable_then.map(|_| true)
340 } else if let Some(e) = &case.else_expr {
341 e.nullable(input_schema)
344 } else {
345 Ok(true)
348 }
349 }
350 Expr::Cast(Cast { expr, .. }) => expr.nullable(input_schema),
351 Expr::ScalarFunction(_func) => {
352 let field = self.to_field(input_schema)?.1;
353
354 let nullable = field.is_nullable();
355 Ok(nullable)
356 }
357 Expr::AggregateFunction(AggregateFunction { func, .. }) => {
358 Ok(func.is_nullable())
359 }
360 Expr::WindowFunction(window_function) => self
361 .data_type_and_nullable_with_window_function(
362 input_schema,
363 window_function,
364 )
365 .map(|(_, nullable)| nullable),
366 Expr::ScalarVariable(field, _) => Ok(field.is_nullable()),
367 Expr::TryCast { .. } | Expr::Unnest(_) | Expr::Placeholder(_) => Ok(true),
368 Expr::IsNull(_)
369 | Expr::IsNotNull(_)
370 | Expr::IsTrue(_)
371 | Expr::IsFalse(_)
372 | Expr::IsUnknown(_)
373 | Expr::IsNotTrue(_)
374 | Expr::IsNotFalse(_)
375 | Expr::IsNotUnknown(_)
376 | Expr::Exists { .. } => Ok(false),
377 Expr::InSubquery(InSubquery { expr, .. }) => expr.nullable(input_schema),
378 Expr::ScalarSubquery(subquery) => {
379 Ok(subquery.subquery.schema().field(0).is_nullable())
380 }
381 Expr::BinaryExpr(BinaryExpr { left, right, .. }) => {
382 Ok(left.nullable(input_schema)? || right.nullable(input_schema)?)
383 }
384 Expr::Like(Like { expr, pattern, .. })
385 | Expr::SimilarTo(Like { expr, pattern, .. }) => {
386 Ok(expr.nullable(input_schema)? || pattern.nullable(input_schema)?)
387 }
388 #[expect(deprecated)]
389 Expr::Wildcard { .. } => Ok(false),
390 Expr::GroupingSet(_) => {
391 Ok(true)
394 }
395 }
396 }
397
398 fn metadata(&self, schema: &dyn ExprSchema) -> Result<FieldMetadata> {
399 self.to_field(schema)
400 .map(|(_, field)| FieldMetadata::from(field.metadata()))
401 }
402
403 fn data_type_and_nullable(
414 &self,
415 schema: &dyn ExprSchema,
416 ) -> Result<(DataType, bool)> {
417 let field = self.to_field(schema)?.1;
418
419 Ok((field.data_type().clone(), field.is_nullable()))
420 }
421
422 fn to_field(
473 &self,
474 schema: &dyn ExprSchema,
475 ) -> Result<(Option<TableReference>, Arc<Field>)> {
476 let (relation, schema_name) = self.qualified_name();
477 #[expect(deprecated)]
478 let field = match self {
479 Expr::Alias(Alias {
480 expr,
481 name: _,
482 metadata,
483 ..
484 }) => {
485 let mut combined_metadata = expr.metadata(schema)?;
486 if let Some(metadata) = metadata {
487 combined_metadata.extend(metadata.clone());
488 }
489
490 Ok(expr
491 .to_field(schema)
492 .map(|(_, f)| f)?
493 .with_field_metadata(&combined_metadata))
494 }
495 Expr::Negative(expr) => expr.to_field(schema).map(|(_, f)| f),
496 Expr::Column(c) => schema.field_from_column(c).map(Arc::clone),
497 Expr::OuterReferenceColumn(field, _) => {
498 Ok(Arc::clone(field).renamed(&schema_name))
499 }
500 Expr::ScalarVariable(field, _) => Ok(Arc::clone(field).renamed(&schema_name)),
501 Expr::Literal(l, metadata) => Ok(Arc::new(
502 Field::new(&schema_name, l.data_type(), l.is_null())
503 .with_field_metadata_opt(metadata.as_ref()),
504 )),
505 Expr::IsNull(_)
506 | Expr::IsNotNull(_)
507 | Expr::IsTrue(_)
508 | Expr::IsFalse(_)
509 | Expr::IsUnknown(_)
510 | Expr::IsNotTrue(_)
511 | Expr::IsNotFalse(_)
512 | Expr::IsNotUnknown(_)
513 | Expr::Exists { .. } => {
514 Ok(Arc::new(Field::new(&schema_name, DataType::Boolean, false)))
515 }
516 Expr::ScalarSubquery(subquery) => {
517 Ok(Arc::clone(&subquery.subquery.schema().fields()[0]))
518 }
519 Expr::BinaryExpr(BinaryExpr { left, right, op }) => {
520 let (left_field, right_field) =
521 (left.to_field(schema)?.1, right.to_field(schema)?.1);
522
523 let (lhs_type, lhs_nullable) =
524 (left_field.data_type(), left_field.is_nullable());
525 let (rhs_type, rhs_nullable) =
526 (right_field.data_type(), right_field.is_nullable());
527 let mut coercer = BinaryTypeCoercer::new(lhs_type, op, rhs_type);
528 coercer.set_lhs_spans(left.spans().cloned().unwrap_or_default());
529 coercer.set_rhs_spans(right.spans().cloned().unwrap_or_default());
530 Ok(Arc::new(Field::new(
531 &schema_name,
532 coercer.get_result_type()?,
533 lhs_nullable || rhs_nullable,
534 )))
535 }
536 Expr::WindowFunction(window_function) => {
537 let (dt, nullable) = self.data_type_and_nullable_with_window_function(
538 schema,
539 window_function,
540 )?;
541 Ok(Arc::new(Field::new(&schema_name, dt, nullable)))
542 }
543 Expr::AggregateFunction(aggregate_function) => {
544 let AggregateFunction {
545 func,
546 params: AggregateFunctionParams { args, .. },
547 ..
548 } = aggregate_function;
549
550 let fields = args
551 .iter()
552 .map(|e| e.to_field(schema).map(|(_, f)| f))
553 .collect::<Result<Vec<_>>>()?;
554 let new_fields = fields_with_udf(&fields, func.as_ref())
556 .map_err(|err| {
557 let arg_types = fields
558 .iter()
559 .map(|f| f.data_type())
560 .cloned()
561 .collect::<Vec<_>>();
562 plan_datafusion_err!(
563 "{} {}",
564 match err {
565 DataFusionError::Plan(msg) => msg,
566 err => err.to_string(),
567 },
568 utils::generate_signature_error_msg(
569 func.name(),
570 func.signature().clone(),
571 &arg_types,
572 )
573 )
574 })?
575 .into_iter()
576 .collect::<Vec<_>>();
577
578 func.return_field(&new_fields)
579 }
580 Expr::ScalarFunction(ScalarFunction { func, args }) => {
581 let (arg_types, fields): (Vec<DataType>, Vec<Arc<Field>>) = args
582 .iter()
583 .map(|e| e.to_field(schema).map(|(_, f)| f))
584 .collect::<Result<Vec<_>>>()?
585 .into_iter()
586 .map(|f| (f.data_type().clone(), f))
587 .unzip();
588 let new_fields =
590 fields_with_udf(&fields, func.as_ref()).map_err(|err| {
591 plan_datafusion_err!(
592 "{} {}",
593 match err {
594 DataFusionError::Plan(msg) => msg,
595 err => err.to_string(),
596 },
597 utils::generate_signature_error_msg(
598 func.name(),
599 func.signature().clone(),
600 &arg_types,
601 )
602 )
603 })?;
604
605 let arguments = args
606 .iter()
607 .map(|e| match e {
608 Expr::Literal(sv, _) => Some(sv),
609 _ => None,
610 })
611 .collect::<Vec<_>>();
612 let args = ReturnFieldArgs {
613 arg_fields: &new_fields,
614 scalar_arguments: &arguments,
615 };
616
617 func.return_field_from_args(args)
618 }
619 Expr::Cast(Cast { expr, data_type }) => expr
621 .to_field(schema)
622 .map(|(_, f)| f.retyped(data_type.clone())),
623 Expr::Placeholder(Placeholder {
624 id: _,
625 field: Some(field),
626 }) => Ok(Arc::clone(field).renamed(&schema_name)),
627 Expr::Like(_)
628 | Expr::SimilarTo(_)
629 | Expr::Not(_)
630 | Expr::Between(_)
631 | Expr::Case(_)
632 | Expr::TryCast(_)
633 | Expr::InList(_)
634 | Expr::InSubquery(_)
635 | Expr::Wildcard { .. }
636 | Expr::GroupingSet(_)
637 | Expr::Placeholder(_)
638 | Expr::Unnest(_) => Ok(Arc::new(Field::new(
639 &schema_name,
640 self.get_type(schema)?,
641 self.nullable(schema)?,
642 ))),
643 }?;
644
645 Ok((
646 relation,
647 field.renamed(&schema_name),
649 ))
650 }
651
652 fn cast_to(self, cast_to_type: &DataType, schema: &dyn ExprSchema) -> Result<Expr> {
659 let this_type = self.get_type(schema)?;
660 if this_type == *cast_to_type {
661 return Ok(self);
662 }
663
664 if can_cast_types(&this_type, cast_to_type) {
669 match self {
670 Expr::ScalarSubquery(subquery) => {
671 Ok(Expr::ScalarSubquery(cast_subquery(subquery, cast_to_type)?))
672 }
673 _ => Ok(Expr::Cast(Cast::new(Box::new(self), cast_to_type.clone()))),
674 }
675 } else {
676 plan_err!("Cannot automatically convert {this_type} to {cast_to_type}")
677 }
678 }
679}
680
681fn unwrap_certainly_null_expr(expr: &Expr) -> &Expr {
683 match expr {
684 Expr::Not(e) => unwrap_certainly_null_expr(e),
685 Expr::Negative(e) => unwrap_certainly_null_expr(e),
686 Expr::Cast(e) => unwrap_certainly_null_expr(e.expr.as_ref()),
687 _ => expr,
688 }
689}
690
691impl Expr {
692 fn data_type_and_nullable_with_window_function(
702 &self,
703 schema: &dyn ExprSchema,
704 window_function: &WindowFunction,
705 ) -> Result<(DataType, bool)> {
706 let WindowFunction {
707 fun,
708 params: WindowFunctionParams { args, .. },
709 ..
710 } = window_function;
711
712 let fields = args
713 .iter()
714 .map(|e| e.to_field(schema).map(|(_, f)| f))
715 .collect::<Result<Vec<_>>>()?;
716 match fun {
717 WindowFunctionDefinition::AggregateUDF(udaf) => {
718 let data_types = fields
719 .iter()
720 .map(|f| f.data_type())
721 .cloned()
722 .collect::<Vec<_>>();
723 let new_fields = fields_with_udf(&fields, udaf.as_ref())
724 .map_err(|err| {
725 plan_datafusion_err!(
726 "{} {}",
727 match err {
728 DataFusionError::Plan(msg) => msg,
729 err => err.to_string(),
730 },
731 utils::generate_signature_error_msg(
732 fun.name(),
733 fun.signature(),
734 &data_types
735 )
736 )
737 })?
738 .into_iter()
739 .collect::<Vec<_>>();
740
741 let return_field = udaf.return_field(&new_fields)?;
742
743 Ok((return_field.data_type().clone(), return_field.is_nullable()))
744 }
745 WindowFunctionDefinition::WindowUDF(udwf) => {
746 let data_types = fields
747 .iter()
748 .map(|f| f.data_type())
749 .cloned()
750 .collect::<Vec<_>>();
751 let new_fields = fields_with_udf(&fields, udwf.as_ref())
752 .map_err(|err| {
753 plan_datafusion_err!(
754 "{} {}",
755 match err {
756 DataFusionError::Plan(msg) => msg,
757 err => err.to_string(),
758 },
759 utils::generate_signature_error_msg(
760 fun.name(),
761 fun.signature(),
762 &data_types
763 )
764 )
765 })?
766 .into_iter()
767 .collect::<Vec<_>>();
768 let (_, function_name) = self.qualified_name();
769 let field_args = WindowUDFFieldArgs::new(&new_fields, &function_name);
770
771 udwf.field(field_args)
772 .map(|field| (field.data_type().clone(), field.is_nullable()))
773 }
774 }
775 }
776}
777
778pub fn cast_subquery(subquery: Subquery, cast_to_type: &DataType) -> Result<Subquery> {
786 if subquery.subquery.schema().field(0).data_type() == cast_to_type {
787 return Ok(subquery);
788 }
789
790 let plan = subquery.subquery.as_ref();
791 let new_plan = match plan {
792 LogicalPlan::Projection(projection) => {
793 let cast_expr = projection.expr[0]
794 .clone()
795 .cast_to(cast_to_type, projection.input.schema())?;
796 LogicalPlan::Projection(Projection::try_new(
797 vec![cast_expr],
798 Arc::clone(&projection.input),
799 )?)
800 }
801 _ => {
802 let cast_expr = Expr::Column(Column::from(plan.schema().qualified_field(0)))
803 .cast_to(cast_to_type, subquery.subquery.schema())?;
804 LogicalPlan::Projection(Projection::try_new(
805 vec![cast_expr],
806 subquery.subquery,
807 )?)
808 }
809 };
810 Ok(Subquery {
811 subquery: Arc::new(new_plan),
812 outer_ref_columns: subquery.outer_ref_columns,
813 spans: Spans::new(),
814 })
815}
816
817#[cfg(test)]
818mod tests {
819 use std::collections::HashMap;
820
821 use super::*;
822 use crate::{and, col, lit, not, or, out_ref_col_with_metadata, when};
823
824 use arrow::datatypes::FieldRef;
825 use datafusion_common::{DFSchema, ScalarValue, assert_or_internal_err};
826
827 macro_rules! test_is_expr_nullable {
828 ($EXPR_TYPE:ident) => {{
829 let expr = lit(ScalarValue::Null).$EXPR_TYPE();
830 assert!(!expr.nullable(&MockExprSchema::new()).unwrap());
831 }};
832 }
833
834 #[test]
835 fn expr_schema_nullability() {
836 let expr = col("foo").eq(lit(1));
837 assert!(!expr.nullable(&MockExprSchema::new()).unwrap());
838 assert!(
839 expr.nullable(&MockExprSchema::new().with_nullable(true))
840 .unwrap()
841 );
842
843 test_is_expr_nullable!(is_null);
844 test_is_expr_nullable!(is_not_null);
845 test_is_expr_nullable!(is_true);
846 test_is_expr_nullable!(is_not_true);
847 test_is_expr_nullable!(is_false);
848 test_is_expr_nullable!(is_not_false);
849 test_is_expr_nullable!(is_unknown);
850 test_is_expr_nullable!(is_not_unknown);
851 }
852
853 #[test]
854 fn test_between_nullability() {
855 let get_schema = |nullable| {
856 MockExprSchema::new()
857 .with_data_type(DataType::Int32)
858 .with_nullable(nullable)
859 };
860
861 let expr = col("foo").between(lit(1), lit(2));
862 assert!(!expr.nullable(&get_schema(false)).unwrap());
863 assert!(expr.nullable(&get_schema(true)).unwrap());
864
865 let null = lit(ScalarValue::Int32(None));
866
867 let expr = col("foo").between(null.clone(), lit(2));
868 assert!(expr.nullable(&get_schema(false)).unwrap());
869
870 let expr = col("foo").between(lit(1), null.clone());
871 assert!(expr.nullable(&get_schema(false)).unwrap());
872
873 let expr = col("foo").between(null.clone(), null);
874 assert!(expr.nullable(&get_schema(false)).unwrap());
875 }
876
877 fn assert_nullability(expr: &Expr, schema: &dyn ExprSchema, expected: bool) {
878 assert_eq!(
879 expr.nullable(schema).unwrap(),
880 expected,
881 "Nullability of '{expr}' should be {expected}"
882 );
883 }
884
885 fn assert_not_nullable(expr: &Expr, schema: &dyn ExprSchema) {
886 assert_nullability(expr, schema, false);
887 }
888
889 fn assert_nullable(expr: &Expr, schema: &dyn ExprSchema) {
890 assert_nullability(expr, schema, true);
891 }
892
893 #[test]
894 fn test_case_expression_nullability() -> Result<()> {
895 let nullable_schema = MockExprSchema::new()
896 .with_data_type(DataType::Int32)
897 .with_nullable(true);
898
899 let not_nullable_schema = MockExprSchema::new()
900 .with_data_type(DataType::Int32)
901 .with_nullable(false);
902
903 let e = when(col("x").is_not_null(), col("x")).otherwise(lit(0))?;
905 assert_not_nullable(&e, &nullable_schema);
906 assert_not_nullable(&e, ¬_nullable_schema);
907
908 let e = when(not(col("x").is_null()), col("x")).otherwise(lit(0))?;
910 assert_not_nullable(&e, &nullable_schema);
911 assert_not_nullable(&e, ¬_nullable_schema);
912
913 let e = when(col("x").eq(lit(5)), col("x")).otherwise(lit(0))?;
915 assert_not_nullable(&e, &nullable_schema);
916 assert_not_nullable(&e, ¬_nullable_schema);
917
918 let e = when(and(col("x").is_not_null(), col("x").eq(lit(5))), col("x"))
920 .otherwise(lit(0))?;
921 assert_not_nullable(&e, &nullable_schema);
922 assert_not_nullable(&e, ¬_nullable_schema);
923
924 let e = when(and(col("x").eq(lit(5)), col("x").is_not_null()), col("x"))
926 .otherwise(lit(0))?;
927 assert_not_nullable(&e, &nullable_schema);
928 assert_not_nullable(&e, ¬_nullable_schema);
929
930 let e = when(or(col("x").is_not_null(), col("x").eq(lit(5))), col("x"))
932 .otherwise(lit(0))?;
933 assert_not_nullable(&e, &nullable_schema);
934 assert_not_nullable(&e, ¬_nullable_schema);
935
936 let e = when(or(col("x").eq(lit(5)), col("x").is_not_null()), col("x"))
938 .otherwise(lit(0))?;
939 assert_not_nullable(&e, &nullable_schema);
940 assert_not_nullable(&e, ¬_nullable_schema);
941
942 let e = when(
944 or(
945 and(col("x").eq(lit(5)), col("x").is_not_null()),
946 and(col("x").eq(col("bar")), col("x").is_not_null()),
947 ),
948 col("x"),
949 )
950 .otherwise(lit(0))?;
951 assert_not_nullable(&e, &nullable_schema);
952 assert_not_nullable(&e, ¬_nullable_schema);
953
954 let e = when(or(col("x").eq(lit(5)), col("x").is_null()), col("x"))
956 .otherwise(lit(0))?;
957 assert_nullable(&e, &nullable_schema);
958 assert_not_nullable(&e, ¬_nullable_schema);
959
960 let e = when(col("x").is_true(), col("x")).otherwise(lit(0))?;
962 assert_not_nullable(&e, &nullable_schema);
963 assert_not_nullable(&e, ¬_nullable_schema);
964
965 let e = when(col("x").is_not_true(), col("x")).otherwise(lit(0))?;
967 assert_nullable(&e, &nullable_schema);
968 assert_not_nullable(&e, ¬_nullable_schema);
969
970 let e = when(col("x").is_false(), col("x")).otherwise(lit(0))?;
972 assert_not_nullable(&e, &nullable_schema);
973 assert_not_nullable(&e, ¬_nullable_schema);
974
975 let e = when(col("x").is_not_false(), col("x")).otherwise(lit(0))?;
977 assert_nullable(&e, &nullable_schema);
978 assert_not_nullable(&e, ¬_nullable_schema);
979
980 let e = when(col("x").is_unknown(), col("x")).otherwise(lit(0))?;
982 assert_nullable(&e, &nullable_schema);
983 assert_not_nullable(&e, ¬_nullable_schema);
984
985 let e = when(col("x").is_not_unknown(), col("x")).otherwise(lit(0))?;
987 assert_not_nullable(&e, &nullable_schema);
988 assert_not_nullable(&e, ¬_nullable_schema);
989
990 let e = when(col("x").like(lit("x")), col("x")).otherwise(lit(0))?;
992 assert_not_nullable(&e, &nullable_schema);
993 assert_not_nullable(&e, ¬_nullable_schema);
994
995 let e = when(lit(0), col("x")).otherwise(lit(0))?;
997 assert_not_nullable(&e, &nullable_schema);
998 assert_not_nullable(&e, ¬_nullable_schema);
999
1000 let e = when(lit(1), col("x")).otherwise(lit(0))?;
1002 assert_nullable(&e, &nullable_schema);
1003 assert_not_nullable(&e, ¬_nullable_schema);
1004
1005 Ok(())
1006 }
1007
1008 #[test]
1009 fn test_inlist_nullability() {
1010 let get_schema = |nullable| {
1011 MockExprSchema::new()
1012 .with_data_type(DataType::Int32)
1013 .with_nullable(nullable)
1014 };
1015
1016 let expr = col("foo").in_list(vec![lit(1); 5], false);
1017 assert!(!expr.nullable(&get_schema(false)).unwrap());
1018 assert!(expr.nullable(&get_schema(true)).unwrap());
1019 assert!(
1021 expr.nullable(&get_schema(false).with_error_on_nullable(true))
1022 .is_err()
1023 );
1024
1025 let null = lit(ScalarValue::Int32(None));
1026 let expr = col("foo").in_list(vec![null, lit(1)], false);
1027 assert!(expr.nullable(&get_schema(false)).unwrap());
1028
1029 let expr = col("foo").in_list(vec![lit(1); 6], false);
1031 assert!(expr.nullable(&get_schema(false)).unwrap());
1032 }
1033
1034 #[test]
1035 fn test_like_nullability() {
1036 let get_schema = |nullable| {
1037 MockExprSchema::new()
1038 .with_data_type(DataType::Utf8)
1039 .with_nullable(nullable)
1040 };
1041
1042 let expr = col("foo").like(lit("bar"));
1043 assert!(!expr.nullable(&get_schema(false)).unwrap());
1044 assert!(expr.nullable(&get_schema(true)).unwrap());
1045
1046 let expr = col("foo").like(lit(ScalarValue::Utf8(None)));
1047 assert!(expr.nullable(&get_schema(false)).unwrap());
1048 }
1049
1050 #[test]
1051 fn expr_schema_data_type() {
1052 let expr = col("foo");
1053 assert_eq!(
1054 DataType::Utf8,
1055 expr.get_type(&MockExprSchema::new().with_data_type(DataType::Utf8))
1056 .unwrap()
1057 );
1058 }
1059
1060 #[test]
1061 fn test_expr_metadata() {
1062 let mut meta = HashMap::new();
1063 meta.insert("bar".to_string(), "buzz".to_string());
1064 let meta = FieldMetadata::from(meta);
1065 let expr = col("foo");
1066 let schema = MockExprSchema::new()
1067 .with_data_type(DataType::Int32)
1068 .with_metadata(meta.clone());
1069
1070 assert_eq!(meta, expr.metadata(&schema).unwrap());
1072 assert_eq!(meta, expr.clone().alias("bar").metadata(&schema).unwrap());
1073 assert_eq!(
1074 meta,
1075 expr.clone()
1076 .cast_to(&DataType::Int64, &schema)
1077 .unwrap()
1078 .metadata(&schema)
1079 .unwrap()
1080 );
1081
1082 let schema = DFSchema::from_unqualified_fields(
1083 vec![meta.add_to_field(Field::new("foo", DataType::Int32, true))].into(),
1084 HashMap::new(),
1085 )
1086 .unwrap();
1087
1088 assert_eq!(meta, expr.metadata(&schema).unwrap());
1090
1091 let outer_ref = out_ref_col_with_metadata(
1093 DataType::Int32,
1094 meta.to_hashmap(),
1095 Column::from_name("foo"),
1096 );
1097 assert_eq!(meta, outer_ref.metadata(&schema).unwrap());
1098 }
1099
1100 #[test]
1101 fn test_expr_placeholder() {
1102 let schema = MockExprSchema::new();
1103
1104 let mut placeholder_meta = HashMap::new();
1105 placeholder_meta.insert("bar".to_string(), "buzz".to_string());
1106 let placeholder_meta = FieldMetadata::from(placeholder_meta);
1107
1108 let expr = Expr::Placeholder(Placeholder::new_with_field(
1109 "".to_string(),
1110 Some(
1111 Field::new("", DataType::Utf8, true)
1112 .with_metadata(placeholder_meta.to_hashmap())
1113 .into(),
1114 ),
1115 ));
1116
1117 let field = expr.to_field(&schema).unwrap().1;
1118 assert_eq!(
1119 (field.data_type(), field.is_nullable()),
1120 (&DataType::Utf8, true)
1121 );
1122 assert_eq!(placeholder_meta, expr.metadata(&schema).unwrap());
1123
1124 let expr_alias = expr.alias("a placeholder by any other name");
1125 let expr_alias_field = expr_alias.to_field(&schema).unwrap().1;
1126 assert_eq!(
1127 (expr_alias_field.data_type(), expr_alias_field.is_nullable()),
1128 (&DataType::Utf8, true)
1129 );
1130 assert_eq!(placeholder_meta, expr_alias.metadata(&schema).unwrap());
1131
1132 let expr = Expr::Placeholder(Placeholder::new_with_field(
1134 "".to_string(),
1135 Some(Field::new("", DataType::Utf8, false).into()),
1136 ));
1137 let expr_field = expr.to_field(&schema).unwrap().1;
1138 assert_eq!(
1139 (expr_field.data_type(), expr_field.is_nullable()),
1140 (&DataType::Utf8, false)
1141 );
1142
1143 let expr_alias = expr.alias("a placeholder by any other name");
1144 let expr_alias_field = expr_alias.to_field(&schema).unwrap().1;
1145 assert_eq!(
1146 (expr_alias_field.data_type(), expr_alias_field.is_nullable()),
1147 (&DataType::Utf8, false)
1148 );
1149 }
1150
1151 #[derive(Debug)]
1152 struct MockExprSchema {
1153 field: FieldRef,
1154 error_on_nullable: bool,
1155 }
1156
1157 impl MockExprSchema {
1158 fn new() -> Self {
1159 Self {
1160 field: Arc::new(Field::new("mock_field", DataType::Null, false)),
1161 error_on_nullable: false,
1162 }
1163 }
1164
1165 fn with_nullable(mut self, nullable: bool) -> Self {
1166 Arc::make_mut(&mut self.field).set_nullable(nullable);
1167 self
1168 }
1169
1170 fn with_data_type(mut self, data_type: DataType) -> Self {
1171 Arc::make_mut(&mut self.field).set_data_type(data_type);
1172 self
1173 }
1174
1175 fn with_error_on_nullable(mut self, error_on_nullable: bool) -> Self {
1176 self.error_on_nullable = error_on_nullable;
1177 self
1178 }
1179
1180 fn with_metadata(mut self, metadata: FieldMetadata) -> Self {
1181 self.field =
1182 Arc::new(metadata.add_to_field(Arc::unwrap_or_clone(self.field)));
1183 self
1184 }
1185 }
1186
1187 impl ExprSchema for MockExprSchema {
1188 fn nullable(&self, _col: &Column) -> Result<bool> {
1189 assert_or_internal_err!(!self.error_on_nullable, "nullable error");
1190 Ok(self.field.is_nullable())
1191 }
1192
1193 fn field_from_column(&self, _col: &Column) -> Result<&FieldRef> {
1194 Ok(&self.field)
1195 }
1196 }
1197
1198 #[test]
1199 fn test_scalar_variable() {
1200 let mut meta = HashMap::new();
1201 meta.insert("bar".to_string(), "buzz".to_string());
1202 let meta = FieldMetadata::from(meta);
1203
1204 let field = Field::new("foo", DataType::Int32, true);
1205 let field = meta.add_to_field(field);
1206 let field = Arc::new(field);
1207
1208 let expr = Expr::ScalarVariable(field, vec!["foo".to_string()]);
1209
1210 let schema = MockExprSchema::new();
1211
1212 assert_eq!(meta, expr.metadata(&schema).unwrap());
1213 }
1214}