1use std::collections::HashSet;
4use std::fmt::{Display, Formatter};
5use std::sync::Arc;
6
7use itertools::Itertools;
8use serde::{de, ser, Deserialize, Deserializer, Serialize, Serializer};
9
10pub use self::column_names::{
11 col, column_expr, column_expr_ref, column_name, column_pred, joined_column_expr,
12 joined_column_name, ColumnName,
13};
14pub use self::scalars::{ArrayData, DecimalData, MapData, Scalar, StructData};
15use crate::kernel_predicates::{
16 DirectDataSkippingPredicateEvaluator, DirectPredicateEvaluator,
17 IndirectDataSkippingPredicateEvaluator,
18};
19use crate::schema::SchemaRef;
20pub use crate::struct_patch::{ExpressionFieldPatch, ExpressionStructPatch};
21use crate::transforms::{transform_output_type, ExpressionTransform};
22use crate::utils::CollectInto;
23use crate::{DataType, DeltaResult, DynPartialEq, Error};
24
25mod column_names;
26pub(crate) mod literal_expression_transform;
27pub(crate) use literal_expression_transform::literal_expression_transform;
28mod scalars;
29#[cfg(feature = "column-defaults-in-dev")]
30mod sql;
31#[cfg(feature = "column-defaults-in-dev")]
32#[allow(unused_imports)]
34pub(crate) use self::sql::parse_sql;
35
36pub type ExpressionRef = std::sync::Arc<Expression>;
37pub type PredicateRef = std::sync::Arc<Predicate>;
38
39pub fn lit(value: impl Into<Scalar>) -> Expression {
49 Expression::literal(value)
50}
51
52pub type ExpressionStructPatchBuilder = crate::struct_patch::StructPatchBuilder<ExpressionRef>;
56
57#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
63pub enum UnaryPredicateOp {
64 IsNull,
66}
67
68#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
70pub enum BinaryPredicateOp {
71 LessThan,
73 GreaterThan,
75 Equal,
77 Distinct,
79 In,
81}
82
83#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
85pub enum UnaryExpressionOp {
86 ToJson,
88}
89
90#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
92pub enum BinaryExpressionOp {
93 Plus,
95 Minus,
97 Multiply,
99 Divide,
101}
102
103#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
105pub enum VariadicExpressionOp {
106 Coalesce,
108 Array,
117}
118
119#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
121pub enum JunctionPredicateOp {
122 And,
124 Or,
126}
127
128pub type ScalarExpressionEvaluator<'a> = dyn Fn(&Expression) -> Option<Scalar> + 'a;
136
137pub trait OpaqueExpressionOp: DynPartialEq + std::fmt::Debug {
139 fn name(&self) -> &str;
141
142 fn eval_expr_scalar(
153 &self,
154 eval_expr: &ScalarExpressionEvaluator<'_>,
155 exprs: &[Expression],
156 ) -> DeltaResult<Scalar>;
157}
158
159pub trait OpaquePredicateOp: DynPartialEq + std::fmt::Debug {
161 fn name(&self) -> &str;
163
164 fn eval_pred_scalar(
177 &self,
178 eval_expr: &ScalarExpressionEvaluator<'_>,
179 eval_pred: &DirectPredicateEvaluator<'_>,
180 exprs: &[Expression],
181 inverted: bool,
182 ) -> DeltaResult<Option<bool>>;
183
184 fn eval_as_data_skipping_predicate(
194 &self,
195 evaluator: &DirectDataSkippingPredicateEvaluator<'_>,
196 exprs: &[Expression],
197 inverted: bool,
198 ) -> Option<bool>;
199
200 fn as_data_skipping_predicate(
213 &self,
214 evaluator: &IndirectDataSkippingPredicateEvaluator<'_>,
215 exprs: &[Expression],
216 inverted: bool,
217 ) -> Option<Predicate>;
218}
219
220pub type OpaqueExpressionOpRef = Arc<dyn OpaqueExpressionOp>;
222
223pub type OpaquePredicateOpRef = Arc<dyn OpaquePredicateOp>;
225
226#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
231pub struct UnaryPredicate {
232 pub op: UnaryPredicateOp,
234 pub expr: Box<Expression>,
236}
237
238#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
239pub struct BinaryPredicate {
240 pub op: BinaryPredicateOp,
242 pub left: Box<Expression>,
244 pub right: Box<Expression>,
246}
247
248#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
249pub struct UnaryExpression {
250 pub op: UnaryExpressionOp,
252 pub expr: Box<Expression>,
254}
255
256#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
257pub struct BinaryExpression {
258 pub op: BinaryExpressionOp,
260 pub left: Box<Expression>,
262 pub right: Box<Expression>,
264}
265
266#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
267pub struct VariadicExpression {
268 pub op: VariadicExpressionOp,
270 pub exprs: Vec<Expression>,
272}
273
274#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
278pub struct ParseJsonExpression {
279 pub json_expr: Box<Expression>,
281 pub output_schema: SchemaRef,
283}
284
285#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
286pub struct JunctionPredicate {
287 pub op: JunctionPredicateOp,
289 pub preds: Vec<Predicate>,
291}
292
293#[derive(Clone, Debug)]
300pub struct OpaquePredicate {
301 pub op: OpaquePredicateOpRef,
302 pub exprs: Vec<Expression>,
303}
304fn fail_serialize_opaque_predicate<S>(
305 _value: &OpaquePredicate,
306 _serializer: S,
307) -> Result<S::Ok, S::Error>
308where
309 S: Serializer,
310{
311 Err(ser::Error::custom("Cannot serialize an Opaque Predicate"))
312}
313
314fn fail_deserialize_opaque_predicate<'de, D>(_deserializer: D) -> Result<OpaquePredicate, D::Error>
315where
316 D: Deserializer<'de>,
317{
318 Err(de::Error::custom("Cannot deserialize an Opaque Predicate"))
319}
320
321impl OpaquePredicate {
322 pub(crate) fn new(
323 op: OpaquePredicateOpRef,
324 exprs: impl IntoIterator<Item = Expression>,
325 ) -> Self {
326 let exprs = exprs.into_iter().collect();
327 Self { op, exprs }
328 }
329}
330
331#[derive(Clone, Debug)]
338pub struct OpaqueExpression {
339 pub op: OpaqueExpressionOpRef,
340 pub exprs: Vec<Expression>,
341}
342
343impl OpaqueExpression {
344 pub(crate) fn new(
345 op: OpaqueExpressionOpRef,
346 exprs: impl IntoIterator<Item = Expression>,
347 ) -> Self {
348 let exprs = exprs.into_iter().collect();
349 Self { op, exprs }
350 }
351}
352
353fn fail_serialize_opaque_expression<S>(
354 _value: &OpaqueExpression,
355 _serializer: S,
356) -> Result<S::Ok, S::Error>
357where
358 S: Serializer,
359{
360 Err(ser::Error::custom("Cannot serialize an Opaque Expression"))
361}
362
363fn fail_deserialize_opaque_expression<'de, D>(
364 _deserializer: D,
365) -> Result<OpaqueExpression, D::Error>
366where
367 D: Deserializer<'de>,
368{
369 Err(de::Error::custom("Cannot deserialize an Opaque Expression"))
370}
371
372#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
378pub enum Expression {
379 Literal(Scalar),
381 Column(ColumnName),
383 Predicate(Box<Predicate>), Struct(Vec<ExpressionRef>, Option<ExpressionRef>),
389 #[serde(alias = "Transform")]
392 StructPatch(ExpressionStructPatch),
393 Unary(UnaryExpression),
395 Binary(BinaryExpression),
397 Variadic(VariadicExpression),
399 #[serde(serialize_with = "fail_serialize_opaque_expression")]
402 #[serde(deserialize_with = "fail_deserialize_opaque_expression")]
403 Opaque(OpaqueExpression),
404 Unknown(String),
413 ParseJson(ParseJsonExpression),
415 MapToStruct(MapToStructExpression),
418}
419
420#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
426pub enum Predicate {
427 BooleanExpression(Expression),
429 Not(Box<Predicate>),
436 Unary(UnaryPredicate),
438 Binary(BinaryPredicate),
440 Junction(JunctionPredicate),
442 #[serde(serialize_with = "fail_serialize_opaque_predicate")]
445 #[serde(deserialize_with = "fail_deserialize_opaque_predicate")]
446 Opaque(OpaquePredicate),
447 Unknown(String),
456}
457
458impl BinaryPredicateOp {
463 pub(crate) fn is_null_intolerant(&self) -> bool {
465 use BinaryPredicateOp::*;
466 match self {
467 LessThan | GreaterThan | Equal => true,
468 Distinct | In => false, }
470 }
471}
472
473impl JunctionPredicateOp {
474 pub(crate) fn invert(&self) -> JunctionPredicateOp {
475 use JunctionPredicateOp::*;
476 match self {
477 And => Or,
478 Or => And,
479 }
480 }
481}
482
483impl UnaryExpression {
484 pub(crate) fn new(op: UnaryExpressionOp, expr: impl Into<Expression>) -> Self {
485 let expr = Box::new(expr.into());
486 Self { op, expr }
487 }
488}
489
490impl UnaryPredicate {
491 pub(crate) fn new(op: UnaryPredicateOp, expr: impl Into<Expression>) -> Self {
492 let expr = Box::new(expr.into());
493 Self { op, expr }
494 }
495}
496
497impl BinaryExpression {
498 pub(crate) fn new(
499 op: BinaryExpressionOp,
500 left: impl Into<Expression>,
501 right: impl Into<Expression>,
502 ) -> Self {
503 let left = Box::new(left.into());
504 let right = Box::new(right.into());
505 Self { op, left, right }
506 }
507}
508
509impl BinaryPredicate {
510 pub(crate) fn new(
511 op: BinaryPredicateOp,
512 left: impl Into<Expression>,
513 right: impl Into<Expression>,
514 ) -> Self {
515 let left = Box::new(left.into());
516 let right = Box::new(right.into());
517 Self { op, left, right }
518 }
519}
520
521impl VariadicExpression {
522 pub(crate) fn new(
523 op: VariadicExpressionOp,
524 exprs: impl IntoIterator<Item = impl Into<Expression>>,
525 ) -> Self {
526 let exprs = exprs.into_iter().map(Into::into).collect();
527 Self { op, exprs }
528 }
529}
530
531impl ParseJsonExpression {
532 pub(crate) fn new(json_expr: impl Into<Expression>, output_schema: SchemaRef) -> Self {
533 Self {
534 json_expr: Box::new(json_expr.into()),
535 output_schema,
536 }
537 }
538}
539
540#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
552pub struct MapToStructExpression {
553 pub map_expr: Box<Expression>,
555}
556
557impl MapToStructExpression {
558 pub(crate) fn new(map_expr: impl Into<Expression>) -> Self {
559 Self {
560 map_expr: Box::new(map_expr.into()),
561 }
562 }
563}
564
565impl JunctionPredicate {
566 pub(crate) fn new(op: JunctionPredicateOp, preds: Vec<Predicate>) -> Self {
567 Self { op, preds }
568 }
569}
570
571impl Expression {
572 pub fn references(&self) -> HashSet<&ColumnName> {
574 let mut references = GetColumnReferences::default();
575 references.transform_expr(self);
576 references.0
577 }
578
579 pub fn column(field_names: impl CollectInto<ColumnName>) -> Expression {
581 ColumnName::new(field_names).into()
582 }
583
584 pub fn literal(value: impl Into<Scalar>) -> Self {
586 Self::Literal(value.into())
587 }
588
589 pub const fn null_literal(data_type: DataType) -> Self {
591 Self::Literal(Scalar::Null(data_type))
592 }
593
594 pub fn from_pred(value: Predicate) -> Self {
596 match value {
597 Predicate::BooleanExpression(expr) => expr,
598 _ => Self::Predicate(Box::new(value)),
599 }
600 }
601
602 pub fn struct_from(exprs: impl IntoIterator<Item = impl Into<Arc<Self>>>) -> Self {
609 Self::Struct(exprs.into_iter().map(Into::into).collect(), None)
610 }
611
612 pub fn struct_with_nullability_from(
617 exprs: impl IntoIterator<Item = impl Into<Arc<Self>>>,
618 nullability_predicate: impl Into<Arc<Self>>,
619 ) -> Self {
620 Self::Struct(
621 exprs.into_iter().map(Into::into).collect(),
622 Some(nullability_predicate.into()),
623 )
624 }
625
626 pub fn struct_patch<P>(patch: P) -> DeltaResult<Self>
636 where
637 P: TryInto<ExpressionStructPatch>,
638 Error: From<P::Error>,
639 {
640 Ok(Self::StructPatch(patch.try_into()?))
641 }
642
643 pub fn is_null(self) -> Predicate {
645 Predicate::is_null(self)
646 }
647
648 pub fn is_not_null(self) -> Predicate {
650 Predicate::is_not_null(self)
651 }
652
653 pub fn eq(self, other: impl Into<Self>) -> Predicate {
655 Predicate::eq(self, other)
656 }
657
658 pub fn ne(self, other: impl Into<Self>) -> Predicate {
660 Predicate::ne(self, other)
661 }
662
663 pub fn le(self, other: impl Into<Self>) -> Predicate {
665 Predicate::le(self, other)
666 }
667
668 pub fn lt(self, other: impl Into<Self>) -> Predicate {
670 Predicate::lt(self, other)
671 }
672
673 pub fn ge(self, other: impl Into<Self>) -> Predicate {
675 Predicate::ge(self, other)
676 }
677
678 pub fn gt(self, other: impl Into<Self>) -> Predicate {
680 Predicate::gt(self, other)
681 }
682
683 pub fn distinct(self, other: impl Into<Self>) -> Predicate {
685 Predicate::distinct(self, other)
686 }
687
688 pub fn unary(op: UnaryExpressionOp, expr: impl Into<Expression>) -> Self {
690 Self::Unary(UnaryExpression::new(op, expr))
691 }
692
693 pub fn binary(
695 op: BinaryExpressionOp,
696 lhs: impl Into<Expression>,
697 rhs: impl Into<Expression>,
698 ) -> Self {
699 Self::Binary(BinaryExpression::new(op, lhs, rhs))
700 }
701
702 pub fn variadic(
704 op: VariadicExpressionOp,
705 exprs: impl IntoIterator<Item = impl Into<Expression>>,
706 ) -> Self {
707 Self::Variadic(VariadicExpression::new(op, exprs))
708 }
709
710 pub fn coalesce(exprs: impl IntoIterator<Item = impl Into<Expression>>) -> Self {
715 Self::variadic(VariadicExpressionOp::Coalesce, exprs)
716 }
717
718 pub fn array(exprs: impl IntoIterator<Item = impl Into<Expression>>) -> Self {
720 Self::variadic(VariadicExpressionOp::Array, exprs)
721 }
722
723 pub fn opaque(
725 op: impl OpaqueExpressionOp,
726 exprs: impl IntoIterator<Item = Expression>,
727 ) -> Self {
728 Self::Opaque(OpaqueExpression::new(Arc::new(op), exprs))
729 }
730
731 pub fn unknown(name: impl Into<String>) -> Self {
733 Self::Unknown(name.into())
734 }
735
736 pub fn parse_json(json_expr: impl Into<Expression>, output_schema: SchemaRef) -> Self {
739 Self::ParseJson(ParseJsonExpression::new(json_expr, output_schema))
740 }
741
742 pub fn map_to_struct(map_expr: impl Into<Expression>) -> Self {
746 Self::MapToStruct(MapToStructExpression::new(map_expr))
747 }
748}
749
750impl Predicate {
751 pub fn references(&self) -> HashSet<&ColumnName> {
753 let mut references = GetColumnReferences::default();
754 references.transform_pred(self);
755 references.0
756 }
757
758 pub fn column(field_names: impl CollectInto<ColumnName>) -> Predicate {
760 Self::from_expr(ColumnName::new(field_names))
761 }
762
763 pub const fn literal(value: bool) -> Self {
765 Self::BooleanExpression(Expression::Literal(Scalar::Boolean(value)))
766 }
767
768 pub const fn null_literal() -> Self {
770 Self::BooleanExpression(Expression::Literal(Scalar::Null(DataType::BOOLEAN)))
771 }
772
773 pub fn from_expr(expr: impl Into<Expression>) -> Self {
775 match expr.into() {
776 Expression::Predicate(p) => *p,
777 expr => Predicate::BooleanExpression(expr),
778 }
779 }
780
781 pub fn not(pred: impl Into<Self>) -> Self {
783 Self::Not(Box::new(pred.into()))
784 }
785
786 pub fn is_null(expr: impl Into<Expression>) -> Predicate {
788 Self::unary(UnaryPredicateOp::IsNull, expr)
789 }
790
791 pub fn is_not_null(expr: impl Into<Expression>) -> Predicate {
793 Self::not(Self::is_null(expr))
794 }
795
796 pub fn eq(a: impl Into<Expression>, b: impl Into<Expression>) -> Self {
798 Self::binary(BinaryPredicateOp::Equal, a, b)
799 }
800
801 pub fn ne(a: impl Into<Expression>, b: impl Into<Expression>) -> Self {
803 Self::not(Self::binary(BinaryPredicateOp::Equal, a, b))
804 }
805
806 pub fn le(a: impl Into<Expression>, b: impl Into<Expression>) -> Self {
808 Self::not(Self::binary(BinaryPredicateOp::GreaterThan, a, b))
809 }
810
811 pub fn lt(a: impl Into<Expression>, b: impl Into<Expression>) -> Self {
813 Self::binary(BinaryPredicateOp::LessThan, a, b)
814 }
815
816 pub fn ge(a: impl Into<Expression>, b: impl Into<Expression>) -> Self {
818 Self::not(Self::binary(BinaryPredicateOp::LessThan, a, b))
819 }
820
821 pub fn gt(a: impl Into<Expression>, b: impl Into<Expression>) -> Self {
823 Self::binary(BinaryPredicateOp::GreaterThan, a, b)
824 }
825
826 pub fn distinct(a: impl Into<Expression>, b: impl Into<Expression>) -> Self {
828 Self::binary(BinaryPredicateOp::Distinct, a, b)
829 }
830
831 pub fn and(a: impl Into<Self>, b: impl Into<Self>) -> Self {
833 Self::and_from([a.into(), b.into()])
834 }
835
836 pub fn or(a: impl Into<Self>, b: impl Into<Self>) -> Self {
838 Self::or_from([a.into(), b.into()])
839 }
840
841 pub fn and_from(preds: impl IntoIterator<Item = Self>) -> Self {
844 Self::junction(JunctionPredicateOp::And, preds)
845 }
846
847 pub fn or_from(preds: impl IntoIterator<Item = Self>) -> Self {
850 Self::junction(JunctionPredicateOp::Or, preds)
851 }
852
853 pub fn unary(op: UnaryPredicateOp, expr: impl Into<Expression>) -> Self {
855 let expr = Box::new(expr.into());
856 Self::Unary(UnaryPredicate { op, expr })
857 }
858
859 pub fn binary(
861 op: BinaryPredicateOp,
862 lhs: impl Into<Expression>,
863 rhs: impl Into<Expression>,
864 ) -> Self {
865 Self::Binary(BinaryPredicate {
866 op,
867 left: Box::new(lhs.into()),
868 right: Box::new(rhs.into()),
869 })
870 }
871
872 pub fn junction(op: JunctionPredicateOp, preds: impl IntoIterator<Item = Self>) -> Self {
880 let mut preds: Vec<_> = preds.into_iter().collect();
881 match preds.len() {
882 0 => match op {
883 JunctionPredicateOp::And => Self::literal(true),
884 JunctionPredicateOp::Or => Self::literal(false),
885 },
886 1 => preds.remove(0),
888 _ => Self::Junction(JunctionPredicate { op, preds }),
889 }
890 }
891
892 pub fn opaque(op: impl OpaquePredicateOp, exprs: impl IntoIterator<Item = Expression>) -> Self {
894 Self::Opaque(OpaquePredicate::new(Arc::new(op), exprs))
895 }
896
897 pub fn unknown(name: impl Into<String>) -> Self {
899 Self::Unknown(name.into())
900 }
901}
902
903impl PartialEq for OpaquePredicate {
908 fn eq(&self, other: &Self) -> bool {
909 self.op.dyn_eq(other.op.any_ref()) && self.exprs == other.exprs
910 }
911}
912
913impl PartialEq for OpaqueExpression {
914 fn eq(&self, other: &Self) -> bool {
915 self.op.dyn_eq(other.op.any_ref()) && self.exprs == other.exprs
916 }
917}
918
919impl Display for UnaryExpressionOp {
920 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
921 use UnaryExpressionOp::*;
922 match self {
923 ToJson => write!(f, "TO_JSON"),
924 }
925 }
926}
927
928impl Display for BinaryExpressionOp {
929 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
930 use BinaryExpressionOp::*;
931 match self {
932 Plus => write!(f, "+"),
933 Minus => write!(f, "-"),
934 Multiply => write!(f, "*"),
935 Divide => write!(f, "/"),
936 }
937 }
938}
939
940impl Display for VariadicExpressionOp {
941 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
942 use VariadicExpressionOp::*;
943 match self {
944 Coalesce => write!(f, "COALESCE"),
945 Array => write!(f, "ARRAY"),
946 }
947 }
948}
949
950impl Display for BinaryPredicateOp {
951 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
952 use BinaryPredicateOp::*;
953 match self {
954 LessThan => write!(f, "<"),
955 GreaterThan => write!(f, ">"),
956 Equal => write!(f, "="),
957 Distinct => write!(f, "DISTINCT"),
961 In => write!(f, "IN"),
962 }
963 }
964}
965
966fn format_child_list<T: Display>(children: &[T]) -> String {
968 children.iter().map(|c| format!("{c}")).join(", ")
969}
970
971impl Display for Expression {
972 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
973 use Expression::*;
974 match self {
975 Literal(l) => write!(f, "{l}"),
976 Column(name) => write!(f, "Column({name})"),
977 Predicate(p) => write!(f, "{p}"),
978 Struct(exprs, _) => write!(f, "Struct({})", format_child_list(exprs)),
979 StructPatch(patch) => {
980 write!(f, "StructPatch(")?;
981 let mut sep = "";
982 if !patch.prepended_fields.is_empty() {
983 let prepended_fields = format_child_list(&patch.prepended_fields);
984 write!(f, "prepend [{prepended_fields}]")?;
985 sep = ", ";
986 }
987 for (field_name, field_patch) in &patch.field_patches {
988 if !field_patch.keep_input && field_patch.insertions.is_empty() {
989 write!(f, "{sep}drop {field_name}")?;
990 sep = ", ";
991 }
992 if !field_patch.insertions.is_empty() {
993 let insertions = format_child_list(&field_patch.insertions);
994 let action = if field_patch.keep_input {
995 "after"
996 } else {
997 "replace/after"
998 };
999 write!(f, "{sep}{action} {field_name} insert [{insertions}]")?;
1000 sep = ", ";
1001 }
1002 }
1003 if !patch.appended_fields.is_empty() {
1004 let appended_fields = format_child_list(&patch.appended_fields);
1005 write!(f, "{sep}append [{appended_fields}]")?;
1006 }
1007 write!(f, ")")
1008 }
1009 Unary(UnaryExpression { op, expr }) => write!(f, "{op}({expr})"),
1010 Binary(BinaryExpression { op, left, right }) => write!(f, "{left} {op} {right}"),
1011 Variadic(VariadicExpression { op, exprs }) => {
1012 write!(f, "{op}({})", format_child_list(exprs))
1013 }
1014 Opaque(OpaqueExpression { op, exprs }) => {
1015 write!(f, "{op:?}({})", format_child_list(exprs))
1016 }
1017 Unknown(name) => write!(f, "<unknown: {name}>"),
1018 ParseJson(p) => {
1019 write!(
1020 f,
1021 "PARSE_JSON({}, <schema:{} fields>)",
1022 p.json_expr,
1023 p.output_schema.fields().len()
1024 )
1025 }
1026 MapToStruct(m) => write!(f, "MAP_TO_STRUCT({})", m.map_expr),
1027 }
1028 }
1029}
1030
1031impl Display for Predicate {
1032 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1033 use Predicate::*;
1034 match self {
1035 BooleanExpression(expr) => write!(f, "{expr}"),
1036 Not(pred) => write!(f, "NOT({pred})"),
1037 Binary(BinaryPredicate {
1038 op: BinaryPredicateOp::Distinct,
1039 left,
1040 right,
1041 }) => write!(f, "DISTINCT({left}, {right})"),
1042 Binary(BinaryPredicate { op, left, right }) => write!(f, "{left} {op} {right}"),
1043 Unary(UnaryPredicate { op, expr }) => match op {
1044 UnaryPredicateOp::IsNull => write!(f, "{expr} IS NULL"),
1045 },
1046 Junction(JunctionPredicate { op, preds }) => {
1047 let op = match op {
1048 JunctionPredicateOp::And => "AND",
1049 JunctionPredicateOp::Or => "OR",
1050 };
1051 write!(f, "{op}({})", format_child_list(preds))
1052 }
1053 Opaque(OpaquePredicate { op, exprs }) => {
1054 write!(f, "{op:?}({})", format_child_list(exprs))
1055 }
1056 Unknown(name) => write!(f, "<unknown: {name}>"),
1057 }
1058 }
1059}
1060
1061impl From<Scalar> for Expression {
1062 fn from(value: Scalar) -> Self {
1063 Self::literal(value)
1064 }
1065}
1066
1067impl From<ColumnName> for Expression {
1068 fn from(value: ColumnName) -> Self {
1069 Self::Column(value)
1070 }
1071}
1072
1073impl From<Predicate> for Expression {
1074 fn from(value: Predicate) -> Self {
1075 Self::from_pred(value)
1076 }
1077}
1078
1079impl From<ColumnName> for Predicate {
1080 fn from(value: ColumnName) -> Self {
1081 Self::from_expr(value)
1082 }
1083}
1084
1085impl<R: Into<Expression>> std::ops::Add<R> for Expression {
1086 type Output = Self;
1087
1088 fn add(self, rhs: R) -> Self::Output {
1089 Self::binary(BinaryExpressionOp::Plus, self, rhs)
1090 }
1091}
1092
1093impl<R: Into<Expression>> std::ops::Sub<R> for Expression {
1094 type Output = Self;
1095
1096 fn sub(self, rhs: R) -> Self {
1097 Self::binary(BinaryExpressionOp::Minus, self, rhs)
1098 }
1099}
1100
1101impl<R: Into<Expression>> std::ops::Mul<R> for Expression {
1102 type Output = Self;
1103
1104 fn mul(self, rhs: R) -> Self {
1105 Self::binary(BinaryExpressionOp::Multiply, self, rhs)
1106 }
1107}
1108
1109impl<R: Into<Expression>> std::ops::Div<R> for Expression {
1110 type Output = Self;
1111
1112 fn div(self, rhs: R) -> Self {
1113 Self::binary(BinaryExpressionOp::Divide, self, rhs)
1114 }
1115}
1116
1117#[derive(Default)]
1119struct GetColumnReferences<'a>(HashSet<&'a ColumnName>);
1120
1121impl<'a> ExpressionTransform<'a> for GetColumnReferences<'a> {
1122 transform_output_type!(|'a, T| ());
1123
1124 fn transform_expr_column(&mut self, name: &'a ColumnName) {
1125 self.0.insert(name);
1126 }
1127}
1128
1129#[cfg(test)]
1130mod tests {
1131 use std::fmt::Debug;
1132
1133 use serde::de::DeserializeOwned;
1134 use serde::Serialize;
1135
1136 use super::{column_expr, column_pred, Expression as Expr, Predicate as Pred};
1137
1138 fn assert_roundtrip<T: Serialize + DeserializeOwned + PartialEq + Debug>(value: &T) {
1140 let json = serde_json::to_string(value).expect("serialization should succeed");
1141 let deserialized: T = serde_json::from_str(&json).expect("deserialization should succeed");
1142 assert_eq!(value, &deserialized, "roundtrip should preserve value");
1143 }
1144
1145 #[test]
1146 fn test_expression_format() {
1147 let cases = [
1148 (column_expr!("x"), "Column(x)"),
1149 (
1150 (column_expr!("x") + Expr::literal(4)) / Expr::literal(10) * Expr::literal(42),
1151 "Column(x) + 4 / 10 * 42",
1152 ),
1153 (
1154 Expr::struct_from([column_expr!("x"), Expr::literal(2), Expr::literal(10)]),
1155 "Struct(Column(x), 2, 10)",
1156 ),
1157 (
1158 Expr::array([column_expr!("x"), column_expr!("y"), Expr::literal(0)]),
1159 "ARRAY(Column(x), Column(y), 0)",
1160 ),
1161 ];
1162
1163 for (expr, expected) in cases {
1164 let result = format!("{expr}");
1165 assert_eq!(result, expected);
1166 }
1167 }
1168
1169 #[test]
1170 fn test_predicate_format() {
1171 let cases = [
1172 (column_pred!("x"), "Column(x)"),
1173 (column_expr!("x").eq(Expr::literal(2)), "Column(x) = 2"),
1174 (
1175 (column_expr!("x") - Expr::literal(4)).lt(Expr::literal(10)),
1176 "Column(x) - 4 < 10",
1177 ),
1178 (
1179 Pred::and(
1180 column_expr!("x").ge(Expr::literal(2)),
1181 column_expr!("x").le(Expr::literal(10)),
1182 ),
1183 "AND(NOT(Column(x) < 2), NOT(Column(x) > 10))",
1184 ),
1185 (
1186 Pred::and_from([
1187 column_expr!("x").ge(Expr::literal(2)),
1188 column_expr!("x").le(Expr::literal(10)),
1189 column_expr!("x").le(Expr::literal(100)),
1190 ]),
1191 "AND(NOT(Column(x) < 2), NOT(Column(x) > 10), NOT(Column(x) > 100))",
1192 ),
1193 (
1194 Pred::or(
1195 column_expr!("x").gt(Expr::literal(2)),
1196 column_expr!("x").lt(Expr::literal(10)),
1197 ),
1198 "OR(Column(x) > 2, Column(x) < 10)",
1199 ),
1200 (
1201 column_expr!("x").eq(Expr::literal("foo")),
1202 "Column(x) = 'foo'",
1203 ),
1204 ];
1205
1206 for (pred, expected) in cases {
1207 let result = format!("{pred}");
1208 assert_eq!(result, expected);
1209 }
1210 }
1211
1212 mod serde_tests {
1215 use std::sync::Arc;
1216
1217 use super::assert_roundtrip;
1218 use crate::expressions::scalars::{ArrayData, DecimalData, MapData, StructData};
1219 use crate::expressions::{
1220 col, column_expr, column_name, lit, BinaryExpressionOp, BinaryPredicateOp, ColumnName,
1221 Expression, ExpressionStructPatchBuilder, Predicate, Scalar, UnaryExpressionOp,
1222 };
1223 use crate::schema::{ArrayType, DataType, DecimalType, MapType, StructField};
1224 use crate::utils::test_utils::assert_result_error_with_message;
1225
1226 #[test]
1229 fn test_literal_scalars_roundtrip() {
1230 let cases: Vec<Expression> = vec![
1232 Expression::literal(42i32), Expression::literal(9999999999i64), Expression::literal(123i16), Expression::literal(42i8), Expression::literal(1.12345677_32), Expression::literal(1.12345667_64), Expression::literal("hello world"),
1241 Expression::literal(true),
1242 Expression::literal(false),
1243 Expression::Literal(Scalar::Timestamp(1234567890000000)),
1245 Expression::Literal(Scalar::TimestampNtz(1234567890000000)),
1246 Expression::Literal(Scalar::Date(19000)),
1247 Expression::Literal(Scalar::Binary(vec![1, 2, 3, 4, 5])),
1249 Expression::Literal(Scalar::Decimal(
1251 DecimalData::try_new(12345i128, DecimalType::try_new(10, 2).unwrap()).unwrap(),
1252 )),
1253 ];
1254
1255 for expr in &cases {
1256 assert_roundtrip(expr);
1257 }
1258 }
1259
1260 #[test]
1261 fn test_literal_complex_scalars_roundtrip() {
1262 let cases: Vec<Expression> = vec![
1264 Expression::null_literal(DataType::INTEGER),
1266 Expression::null_literal(DataType::STRING),
1267 Expression::null_literal(DataType::BOOLEAN),
1268 Expression::Literal(Scalar::Array(
1270 ArrayData::try_new(
1271 ArrayType::new(DataType::INTEGER, false),
1272 vec![Scalar::Integer(1), Scalar::Integer(2), Scalar::Integer(3)],
1273 )
1274 .unwrap(),
1275 )),
1276 Expression::Literal(Scalar::Map(
1278 MapData::try_new(
1279 MapType::new(DataType::STRING, DataType::INTEGER, false),
1280 vec![
1281 (Scalar::String("a".to_string()), Scalar::Integer(1)),
1282 (Scalar::String("b".to_string()), Scalar::Integer(2)),
1283 ],
1284 )
1285 .unwrap(),
1286 )),
1287 Expression::Literal(Scalar::Struct(
1289 StructData::try_new(
1290 vec![
1291 StructField::nullable("x", DataType::INTEGER),
1292 StructField::nullable("y", DataType::STRING),
1293 ],
1294 vec![Scalar::Integer(42), Scalar::String("hello".to_string())],
1295 )
1296 .unwrap(),
1297 )),
1298 ];
1299
1300 for expr in &cases {
1301 assert_roundtrip(expr);
1302 }
1303 }
1304
1305 #[test]
1308 fn test_column_expressions_roundtrip() {
1309 let cases: Vec<Expression> = vec![
1310 column_expr!("my_column"),
1311 Expression::column(["parent", "child"]),
1312 Expression::column(["a", "b", "c", "d"]),
1313 ];
1314
1315 for expr in &cases {
1316 assert_roundtrip(expr);
1317 }
1318 }
1319
1320 #[test]
1321 fn test_column_names_roundtrip() {
1322 let cases: Vec<ColumnName> = vec![
1323 column_name!("simple"),
1324 ColumnName::new(["a", "b", "c"]),
1325 ColumnName::default(),
1326 ];
1327
1328 for col in &cases {
1329 assert_roundtrip(col);
1330 }
1331 }
1332
1333 #[test]
1336 fn test_unary_expression_roundtrip() {
1337 let expr = Expression::unary(UnaryExpressionOp::ToJson, column_expr!("data"));
1338 assert_roundtrip(&expr);
1339 }
1340
1341 #[test]
1342 fn test_binary_expressions_roundtrip() {
1343 let ops = [
1344 BinaryExpressionOp::Plus,
1345 BinaryExpressionOp::Minus,
1346 BinaryExpressionOp::Multiply,
1347 BinaryExpressionOp::Divide,
1348 ];
1349
1350 for op in ops {
1351 let expr = Expression::binary(op, column_expr!("a"), Expression::literal(10));
1352 assert_roundtrip(&expr);
1353 }
1354 }
1355
1356 #[test]
1357 fn test_variadic_expression_roundtrip() {
1358 let expr = Expression::coalesce([
1359 column_expr!("a"),
1360 column_expr!("b"),
1361 Expression::literal("default"),
1362 ]);
1363 assert_roundtrip(&expr);
1364 }
1365
1366 #[rstest::rstest]
1367 #[case::array_single(Expression::array([Expression::literal(7i32)]))]
1368 #[case::array_mixed(Expression::array([
1369 column_expr!("a"),
1370 column_expr!("b"),
1371 Expression::literal(42i64),
1372 ]))]
1373 fn test_array_expression_roundtrip(#[case] expr: Expression) {
1374 assert_roundtrip(&expr);
1375 }
1376
1377 #[test]
1378 fn test_nested_arithmetic_expression_roundtrip() {
1379 let left = Expression::binary(
1381 BinaryExpressionOp::Plus,
1382 column_expr!("a"),
1383 column_expr!("b"),
1384 );
1385 let right = Expression::binary(
1386 BinaryExpressionOp::Minus,
1387 column_expr!("c"),
1388 column_expr!("d"),
1389 );
1390 let mul = Expression::binary(BinaryExpressionOp::Multiply, left, right);
1391 let expr = Expression::binary(BinaryExpressionOp::Divide, mul, Expression::literal(2));
1392 assert_roundtrip(&expr);
1393 }
1394
1395 #[test]
1398 fn test_struct_expression_roundtrip() {
1399 let expr = Expression::struct_from([
1400 Arc::new(column_expr!("x")),
1401 Arc::new(Expression::literal(42)),
1402 Arc::new(Expression::literal("hello")),
1403 ]);
1404 assert_roundtrip(&expr);
1405 }
1406
1407 #[test]
1408 fn test_transform_expressions_roundtrip() {
1409 let cases: Vec<Expression> = vec![
1410 Expression::struct_patch(ExpressionStructPatchBuilder::new()).unwrap(),
1412 Expression::struct_patch(ExpressionStructPatchBuilder::new().drop("old_column"))
1414 .unwrap(),
1415 Expression::struct_patch(
1417 ExpressionStructPatchBuilder::new().replace("original", lit(0)),
1418 )
1419 .unwrap(),
1420 Expression::struct_patch(
1422 ExpressionStructPatchBuilder::new()
1423 .insert_after("after_col", col!("new_col"))
1424 .prepend(lit("prepended"))
1425 .append(lit("appended")),
1426 )
1427 .unwrap(),
1428 Expression::struct_patch(
1430 ExpressionStructPatchBuilder::new_nested(["parent", "child"]).drop("to_drop"),
1431 )
1432 .unwrap(),
1433 ];
1434
1435 for expr in &cases {
1436 assert_roundtrip(expr);
1437 }
1438 }
1439
1440 #[test]
1441 fn test_expression_wrapping_predicate_roundtrip() {
1442 let pred = Predicate::eq(column_expr!("x"), Expression::literal(10));
1443 let expr = Expression::from_pred(pred);
1444 assert_roundtrip(&expr);
1445 }
1446
1447 #[test]
1448 fn test_expression_unknown_roundtrip() {
1449 let expr = Expression::unknown("some_unknown_function()");
1450 assert_roundtrip(&expr);
1451 }
1452
1453 #[test]
1454 fn test_map_to_struct_expression_roundtrip() {
1455 let cases: Vec<Expression> = vec![
1456 Expression::map_to_struct(column_expr!("pv")),
1457 Expression::map_to_struct(Expression::literal("ignored")),
1458 ];
1459
1460 for expr in &cases {
1461 assert_roundtrip(expr);
1462 }
1463 }
1464
1465 #[test]
1468 fn test_predicate_basics_roundtrip() {
1469 let cases: Vec<Predicate> = vec![
1470 Predicate::from_expr(column_expr!("is_active")),
1472 Predicate::literal(true),
1474 Predicate::literal(false),
1475 Predicate::not(Predicate::from_expr(column_expr!("x"))),
1477 Predicate::not(Predicate::not(Predicate::gt(
1479 column_expr!("x"),
1480 Expression::literal(5),
1481 ))),
1482 Predicate::unknown("some_unknown_predicate()"),
1484 Predicate::is_null(column_expr!("nullable_col")),
1486 Predicate::is_not_null(column_expr!("nullable_col")),
1487 ];
1488
1489 for pred in &cases {
1490 assert_roundtrip(pred);
1491 }
1492 }
1493
1494 #[test]
1495 fn test_predicate_null_literal_roundtrip() {
1496 let pred = Predicate::null_literal();
1497 assert_roundtrip(&pred);
1498 }
1499
1500 #[test]
1501 fn test_predicate_comparisons_roundtrip() {
1502 let cases: Vec<Predicate> = vec![
1503 Predicate::eq(column_expr!("x"), Expression::literal(42)),
1504 Predicate::ne(column_expr!("status"), Expression::literal("active")),
1505 Predicate::lt(column_expr!("age"), Expression::literal(18)),
1506 Predicate::le(column_expr!("price"), Expression::literal(100)),
1507 Predicate::gt(column_expr!("score"), Expression::literal(90)),
1508 Predicate::ge(column_expr!("quantity"), Expression::literal(1)),
1509 Predicate::distinct(column_expr!("a"), column_expr!("b")),
1510 ];
1511
1512 for pred in &cases {
1513 assert_roundtrip(pred);
1514 }
1515 }
1516
1517 #[test]
1518 fn test_predicate_in_roundtrip() {
1519 let array_data = ArrayData::try_new(
1520 ArrayType::new(DataType::INTEGER, false),
1521 vec![Scalar::Integer(1), Scalar::Integer(2), Scalar::Integer(3)],
1522 )
1523 .unwrap();
1524 let pred = Predicate::binary(
1525 BinaryPredicateOp::In,
1526 column_expr!("x"),
1527 Expression::Literal(Scalar::Array(array_data)),
1528 );
1529 assert_roundtrip(&pred);
1530 }
1531
1532 #[test]
1533 fn test_predicate_junctions_roundtrip() {
1534 let cases: Vec<Predicate> = vec![
1535 Predicate::and(
1537 Predicate::gt(column_expr!("x"), Expression::literal(0)),
1538 Predicate::lt(column_expr!("x"), Expression::literal(100)),
1539 ),
1540 Predicate::or(
1542 Predicate::eq(column_expr!("status"), Expression::literal("active")),
1543 Predicate::eq(column_expr!("status"), Expression::literal("pending")),
1544 ),
1545 Predicate::and_from([
1547 Predicate::gt(column_expr!("x"), Expression::literal(0)),
1548 Predicate::lt(column_expr!("x"), Expression::literal(100)),
1549 Predicate::is_not_null(column_expr!("x")),
1550 ]),
1551 Predicate::or_from([
1553 Predicate::eq(column_expr!("type"), Expression::literal("A")),
1554 Predicate::eq(column_expr!("type"), Expression::literal("B")),
1555 Predicate::eq(column_expr!("type"), Expression::literal("C")),
1556 ]),
1557 Predicate::or(
1559 Predicate::and(
1560 Predicate::gt(column_expr!("a"), Expression::literal(0)),
1561 Predicate::lt(column_expr!("b"), Expression::literal(100)),
1562 ),
1563 Predicate::eq(column_expr!("c"), Expression::literal("special")),
1564 ),
1565 ];
1566
1567 for pred in &cases {
1568 assert_roundtrip(pred);
1569 }
1570 }
1571
1572 #[test]
1575 fn test_deeply_nested_structures_roundtrip() {
1576 let add = Expression::binary(
1578 BinaryExpressionOp::Plus,
1579 column_expr!("a"),
1580 column_expr!("b"),
1581 );
1582 let mul = Expression::binary(
1583 BinaryExpressionOp::Multiply,
1584 column_expr!("c"),
1585 column_expr!("d"),
1586 );
1587 let coalesce = Expression::coalesce([add, mul, Expression::literal(0)]);
1588 let pred = Predicate::gt(coalesce, Expression::literal(100));
1589 assert_roundtrip(&pred);
1590
1591 let inner_pred = Predicate::and(
1593 Predicate::eq(column_expr!("x"), Expression::literal(1)),
1594 Predicate::gt(
1595 Expression::binary(
1596 BinaryExpressionOp::Plus,
1597 column_expr!("y"),
1598 column_expr!("z"),
1599 ),
1600 Expression::literal(10),
1601 ),
1602 );
1603 let expr = Expression::from_pred(inner_pred);
1604 assert_roundtrip(&expr);
1605 }
1606
1607 #[test]
1610 fn test_opaque_expression_serialize_fails() {
1611 use crate::expressions::{OpaqueExpressionOp, ScalarExpressionEvaluator};
1612 use crate::DeltaResult;
1613
1614 #[derive(Debug, PartialEq)]
1615 struct TestOpaqueExprOp;
1616
1617 impl OpaqueExpressionOp for TestOpaqueExprOp {
1618 fn name(&self) -> &str {
1619 "test_opaque"
1620 }
1621 fn eval_expr_scalar(
1622 &self,
1623 _eval_expr: &ScalarExpressionEvaluator<'_>,
1624 _exprs: &[Expression],
1625 ) -> DeltaResult<Scalar> {
1626 Ok(Scalar::Integer(0))
1627 }
1628 }
1629
1630 let expr = Expression::opaque(TestOpaqueExprOp, [Expression::literal(1)]);
1631 let result = serde_json::to_string(&expr);
1632 assert_result_error_with_message(result, "Cannot serialize an Opaque Expression");
1633 }
1634
1635 #[test]
1636 fn test_opaque_predicate_serialize_fails() {
1637 use crate::expressions::{OpaquePredicateOp, ScalarExpressionEvaluator};
1638 use crate::kernel_predicates::{
1639 DirectDataSkippingPredicateEvaluator, DirectPredicateEvaluator,
1640 IndirectDataSkippingPredicateEvaluator,
1641 };
1642 use crate::DeltaResult;
1643
1644 #[derive(Debug, PartialEq)]
1645 struct TestOpaquePredOp;
1646
1647 impl OpaquePredicateOp for TestOpaquePredOp {
1648 fn name(&self) -> &str {
1649 "test_opaque_pred"
1650 }
1651 fn eval_pred_scalar(
1652 &self,
1653 _eval_expr: &ScalarExpressionEvaluator<'_>,
1654 _eval_pred: &DirectPredicateEvaluator<'_>,
1655 _exprs: &[Expression],
1656 _inverted: bool,
1657 ) -> DeltaResult<Option<bool>> {
1658 Ok(Some(true))
1659 }
1660 fn eval_as_data_skipping_predicate(
1661 &self,
1662 _evaluator: &DirectDataSkippingPredicateEvaluator<'_>,
1663 _exprs: &[Expression],
1664 _inverted: bool,
1665 ) -> Option<bool> {
1666 Some(true)
1667 }
1668 fn as_data_skipping_predicate(
1669 &self,
1670 _evaluator: &IndirectDataSkippingPredicateEvaluator<'_>,
1671 _exprs: &[Expression],
1672 _inverted: bool,
1673 ) -> Option<Predicate> {
1674 None
1675 }
1676 }
1677
1678 let pred = Predicate::opaque(TestOpaquePredOp, [Expression::literal(1)]);
1679 let result = serde_json::to_string(&pred);
1680 assert_result_error_with_message(result, "Cannot serialize an Opaque Predicate");
1681 }
1682 }
1683
1684 #[test]
1685 fn single_element_and_from_returns_unwrapped_predicate() {
1686 let inner = Pred::gt(column_expr!("x"), Expr::literal(0));
1687 let result = Pred::and_from([inner.clone()]);
1688 assert_eq!(result, inner);
1689 }
1690
1691 #[test]
1692 fn single_element_or_from_returns_unwrapped_predicate() {
1693 let inner = Pred::gt(column_expr!("x"), Expr::literal(0));
1694 let result = Pred::or_from([inner.clone()]);
1695 assert_eq!(result, inner);
1696 }
1697
1698 #[test]
1699 fn multi_element_and_from_returns_junction() {
1700 let p1 = Pred::gt(column_expr!("x"), Expr::literal(0));
1701 let p2 = Pred::lt(column_expr!("x"), Expr::literal(100));
1702 let result = Pred::and_from([p1.clone(), p2.clone()]);
1703 assert!(matches!(result, Pred::Junction(ref j) if j.preds.len() == 2));
1704 assert_eq!(result, Pred::and(p1, p2));
1705 }
1706
1707 #[test]
1708 fn empty_and_from_returns_identity_literal() {
1709 let result = Pred::and_from(std::iter::empty());
1710 assert_eq!(result, Pred::literal(true));
1711 }
1712
1713 #[test]
1714 fn empty_or_from_returns_identity_literal() {
1715 let result = Pred::or_from(std::iter::empty());
1716 assert_eq!(result, Pred::literal(false));
1717 }
1718}