Skip to main content

delta_kernel/expressions/
mod.rs

1//! Definitions and functions to create and manipulate kernel expressions
2
3use std::collections::HashSet;
4use std::fmt::{Display, Formatter};
5use std::sync::Arc;
6
7use itertools::Itertools;
8use serde::{de, ser, Deserialize, Deserializer, Serialize, Serializer};
9
10pub use self::column_names::{
11    col, column_expr, column_expr_ref, column_name, column_pred, joined_column_expr,
12    joined_column_name, ColumnName,
13};
14pub use self::scalars::{ArrayData, DecimalData, MapData, Scalar, StructData};
15use crate::kernel_predicates::{
16    DirectDataSkippingPredicateEvaluator, DirectPredicateEvaluator,
17    IndirectDataSkippingPredicateEvaluator,
18};
19use crate::schema::SchemaRef;
20pub use crate::struct_patch::{ExpressionFieldPatch, ExpressionStructPatch};
21use crate::transforms::{transform_output_type, ExpressionTransform};
22use crate::utils::CollectInto;
23use crate::{DataType, DeltaResult, DynPartialEq, Error};
24
25mod column_names;
26pub(crate) mod literal_expression_transform;
27pub(crate) use literal_expression_transform::literal_expression_transform;
28mod scalars;
29#[cfg(feature = "column-defaults-in-dev")]
30mod sql;
31#[cfg(feature = "column-defaults-in-dev")]
32// TODO(#2630): Wire up `parse_sql` to column-defaults and remove this allow
33#[allow(unused_imports)]
34pub(crate) use self::sql::parse_sql;
35
36pub type ExpressionRef = std::sync::Arc<Expression>;
37pub type PredicateRef = std::sync::Arc<Predicate>;
38
39/// Build an [`Expression::Literal`] from anything that converts into a [`Scalar`].
40///
41/// Concise alternative to [`Expression::literal`] for plan builders. Accepts the same value
42/// types [`Scalar`] does (`i32`, `i64`, `&str`, `bool`, ...).
43///
44/// ```
45/// use delta_kernel::expressions::lit;
46/// let _zero = lit(0i64);
47/// ```
48pub fn lit(value: impl Into<Scalar>) -> Expression {
49    Expression::literal(value)
50}
51
52/// A [`StructPatchBuilder`](crate::struct_patch::StructPatchBuilder) whose emitted items are
53/// expressions, lowered into an [`ExpressionStructPatch`] that can be embedded in an
54/// [`Expression`].
55pub type ExpressionStructPatchBuilder = crate::struct_patch::StructPatchBuilder<ExpressionRef>;
56
57////////////////////////////////////////////////////////////////////////
58// Operators
59////////////////////////////////////////////////////////////////////////
60
61/// A unary predicate operator.
62#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
63pub enum UnaryPredicateOp {
64    /// Unary Is Null
65    IsNull,
66}
67
68/// A binary predicate operator.
69#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
70pub enum BinaryPredicateOp {
71    /// Comparison Less Than
72    LessThan,
73    /// Comparison Greater Than
74    GreaterThan,
75    /// Comparison Equal
76    Equal,
77    /// Distinct
78    Distinct,
79    /// IN
80    In,
81}
82
83/// A unary expression operator.
84#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
85pub enum UnaryExpressionOp {
86    /// Convert struct data to JSON-encoded strings
87    ToJson,
88}
89
90/// A binary expression operator.
91#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
92pub enum BinaryExpressionOp {
93    /// Arithmetic Plus
94    Plus,
95    /// Arithmetic Minus
96    Minus,
97    /// Arithmetic Multiply
98    Multiply,
99    /// Arithmetic Divide
100    Divide,
101}
102
103/// A variadic expression operator.
104#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
105pub enum VariadicExpressionOp {
106    /// Collapse multiple values into one by taking the first non-null value
107    Coalesce,
108    /// Construct an Array by evaluating each input expression. For example, the expression
109    /// `Array(1, (1 + 2), col("my_int_col"))` evaluates to the array
110    /// `[1, 3, <my_int_col value>]` per row. All inputs must share the same element type.
111    /// Requires at least one element; the element type is inferred from the inputs.
112    ///
113    /// For static array literals whose elements are all compile-time constants, use
114    /// [`Scalar::Array`] instead. The difference is that `Array` is evaluated at runtime, while
115    /// `Scalar::Array` is evaluated at compile time.
116    Array,
117}
118
119/// A junction (AND/OR) predicate operator.
120#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
121pub enum JunctionPredicateOp {
122    /// Conjunction
123    And,
124    /// Disjunction
125    Or,
126}
127
128/// A kernel-supplied scalar expression evaluator which in particular can convert column references
129/// (i.e. [`Expression::Column`]) to [`Scalar`] values. [`OpaqueExpressionOp::eval_expr_scalar`] and
130/// [`OpaquePredicateOp::eval_pred_scalar`] rely on this evaluator.
131///
132/// If the evaluator produces `None`, it means kernel was unable to evaluate
133/// the input expression. Otherwise, `Some(Scalar)` is the result of that evaluation (possibly
134/// `Scalar::Null` if the output was NULL).
135pub type ScalarExpressionEvaluator<'a> = dyn Fn(&Expression) -> Option<Scalar> + 'a;
136
137/// An opaque expression operation (ie defined and implemented by the engine).
138pub trait OpaqueExpressionOp: DynPartialEq + std::fmt::Debug {
139    /// Succinctly identifies this op
140    fn name(&self) -> &str;
141
142    /// Attempts scalar evaluation of this opaque expression, e.g. for partition pruning.
143    ///
144    /// Implementations can evaluate the child expressions however they see fit, possibly by
145    /// calling back to the provided [`ScalarExpressionEvaluator`],
146    ///
147    /// An output of `Err` indicates that this operation does not support scalar evaluation, or was
148    /// invoked incorrectly (e.g. with the wrong number and/or types of arguments, None input,
149    /// etc); the operation is disqualified from participating in partition pruning.
150    ///
151    /// `Ok(Scalar::Null)` means the operation actually produced a legitimately NULL result.
152    fn eval_expr_scalar(
153        &self,
154        eval_expr: &ScalarExpressionEvaluator<'_>,
155        exprs: &[Expression],
156    ) -> DeltaResult<Scalar>;
157}
158
159/// An opaque predicate operation (ie defined and implemented by the engine).
160pub trait OpaquePredicateOp: DynPartialEq + std::fmt::Debug {
161    /// Succinctly identifies this op
162    fn name(&self) -> &str;
163
164    /// Attempts scalar evaluation of this (possibly inverted) opaque predicate on behalf of a
165    /// [`DirectPredicateEvaluator`], e.g. for partition pruning or to evaluate an opaque data
166    /// skipping predicate produced previously by an [`IndirectDataSkippingPredicateEvaluator`].
167    ///
168    /// Implementations can evaluate the child expressions however they see fit, possibly by calling
169    /// back to the provided [`ScalarExpressionEvaluator`] and/or [`DirectPredicateEvaluator`].
170    ///
171    /// An output of `Err` indicates that this operation does not support scalar evaluation, or was
172    /// invoked incorrectly (e.g. wrong number and/or types of arguments, None input, etc); the
173    /// operation is disqualified from participating in partition pruning and/or data skipping.
174    ///
175    /// `Ok(None)` means the operation actually produced a legitimately NULL output.
176    fn eval_pred_scalar(
177        &self,
178        eval_expr: &ScalarExpressionEvaluator<'_>,
179        eval_pred: &DirectPredicateEvaluator<'_>,
180        exprs: &[Expression],
181        inverted: bool,
182    ) -> DeltaResult<Option<bool>>;
183
184    /// Evaluates this (possibly inverted) opaque predicate for data skipping on behalf of a
185    /// [`DirectDataSkippingPredicateEvaluator`], e.g. for parquet row group skipping.
186    ///
187    /// Implementations can evaluate the child expressions however they see fit, possibly by
188    /// calling back to the provided [`DirectDataSkippingPredicateEvaluator`].
189    ///
190    /// An output of `None` indicates that this operation does not support evaluation as a data
191    /// skipping predicate, or was invoked incorrectly (e.g. wrong number and/or types of arguments,
192    /// None input, etc.); the operation is disqualified from participating in row group skipping.
193    fn eval_as_data_skipping_predicate(
194        &self,
195        evaluator: &DirectDataSkippingPredicateEvaluator<'_>,
196        exprs: &[Expression],
197        inverted: bool,
198    ) -> Option<bool>;
199
200    /// Converts this (possibly inverted) opaque predicate to a data skipping predicate on behalf of
201    /// an [`IndirectDataSkippingPredicateEvaluator`], e.g. for stats-based file pruning.
202    ///
203    /// Implementations can transform the predicate and its child expressions however they see fit,
204    /// possibly by calling back to the owning [`IndirectDataSkippingPredicateEvaluator`].
205    ///
206    /// An output of `None` indicates that this operation does not support conversion to a data
207    /// skipping predicate, or was invoked incorrectly (e.g. wrong number and/or types of arguments,
208    /// None input, etc.); the operation is disqualified from participating in file pruning.
209    //
210    // NOTE: It would be nicer if this method could accept an `Arc<Self>`, in case the data skipping
211    // predicate rewrite can reuse the same operation. But sadly, that would not be dyn-compatible.
212    fn as_data_skipping_predicate(
213        &self,
214        evaluator: &IndirectDataSkippingPredicateEvaluator<'_>,
215        exprs: &[Expression],
216        inverted: bool,
217    ) -> Option<Predicate>;
218}
219
220/// A shared reference to an [`OpaqueExpressionOp`] instance.
221pub type OpaqueExpressionOpRef = Arc<dyn OpaqueExpressionOp>;
222
223/// A shared reference to an [`OpaquePredicateOp`] instance.
224pub type OpaquePredicateOpRef = Arc<dyn OpaquePredicateOp>;
225
226////////////////////////////////////////////////////////////////////////
227// Expressions and predicates
228////////////////////////////////////////////////////////////////////////
229
230#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
231pub struct UnaryPredicate {
232    /// The operator.
233    pub op: UnaryPredicateOp,
234    /// The input expression.
235    pub expr: Box<Expression>,
236}
237
238#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
239pub struct BinaryPredicate {
240    /// The operator.
241    pub op: BinaryPredicateOp,
242    /// The left-hand side of the operation.
243    pub left: Box<Expression>,
244    /// The right-hand side of the operation.
245    pub right: Box<Expression>,
246}
247
248#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
249pub struct UnaryExpression {
250    /// The operator.
251    pub op: UnaryExpressionOp,
252    /// The input expression.
253    pub expr: Box<Expression>,
254}
255
256#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
257pub struct BinaryExpression {
258    /// The operator.
259    pub op: BinaryExpressionOp,
260    /// The left-hand side of the operation.
261    pub left: Box<Expression>,
262    /// The right-hand side of the operation.
263    pub right: Box<Expression>,
264}
265
266#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
267pub struct VariadicExpression {
268    /// The operator.
269    pub op: VariadicExpressionOp,
270    /// The input expressions.
271    pub exprs: Vec<Expression>,
272}
273
274/// An expression that parses a JSON string into a struct with the given schema.
275/// This is the inverse of `ToJson` - it converts a JSON-encoded string column into a
276/// struct column.
277#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
278pub struct ParseJsonExpression {
279    /// The expression that evaluates to a STRING column containing JSON objects.
280    pub json_expr: Box<Expression>,
281    /// The schema defining the structure to parse the JSON into.
282    pub output_schema: SchemaRef,
283}
284
285#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
286pub struct JunctionPredicate {
287    /// The operator.
288    pub op: JunctionPredicateOp,
289    /// The input predicates.
290    pub preds: Vec<Predicate>,
291}
292
293// NOTE: We have to use `Arc<dyn OpaquePredicateOp>` instead of `Box<dyn OpaquePredicateOp>` because
294// we cannot require `OpaquePredicateOp: Clone` (not a dyn-compatible trait). Instead, we must rely
295// on cheap `Arc` clone, which does not duplicate the inner object.
296//
297// TODO(#1564): OpaquePredicate currently does not support serialization or deserialization. In the
298// future, the [`OpaquePredicateOp`] trait can be extended to support ser/de.
299#[derive(Clone, Debug)]
300pub struct OpaquePredicate {
301    pub op: OpaquePredicateOpRef,
302    pub exprs: Vec<Expression>,
303}
304fn fail_serialize_opaque_predicate<S>(
305    _value: &OpaquePredicate,
306    _serializer: S,
307) -> Result<S::Ok, S::Error>
308where
309    S: Serializer,
310{
311    Err(ser::Error::custom("Cannot serialize an Opaque Predicate"))
312}
313
314fn fail_deserialize_opaque_predicate<'de, D>(_deserializer: D) -> Result<OpaquePredicate, D::Error>
315where
316    D: Deserializer<'de>,
317{
318    Err(de::Error::custom("Cannot deserialize an Opaque Predicate"))
319}
320
321impl OpaquePredicate {
322    pub(crate) fn new(
323        op: OpaquePredicateOpRef,
324        exprs: impl IntoIterator<Item = Expression>,
325    ) -> Self {
326        let exprs = exprs.into_iter().collect();
327        Self { op, exprs }
328    }
329}
330
331// NOTE: We have to use `Arc<dyn OpaqueExpressionOp>` instead of `Box<dyn OpaqueExpressionOp>`
332// because we cannot require `OpaqueExpressionOp: Clone` (not a dyn-compatible trait). Instead, we
333// must rely on cheap `Arc` clone, which does not duplicate the inner object.
334//
335// TODO(#1564): OpaqueExpression currently does not support serialization or deserialization. In the
336// future, the [`OpaqueExpressionOp`] trait can be extended to support ser/de.
337#[derive(Clone, Debug)]
338pub struct OpaqueExpression {
339    pub op: OpaqueExpressionOpRef,
340    pub exprs: Vec<Expression>,
341}
342
343impl OpaqueExpression {
344    pub(crate) fn new(
345        op: OpaqueExpressionOpRef,
346        exprs: impl IntoIterator<Item = Expression>,
347    ) -> Self {
348        let exprs = exprs.into_iter().collect();
349        Self { op, exprs }
350    }
351}
352
353fn fail_serialize_opaque_expression<S>(
354    _value: &OpaqueExpression,
355    _serializer: S,
356) -> Result<S::Ok, S::Error>
357where
358    S: Serializer,
359{
360    Err(ser::Error::custom("Cannot serialize an Opaque Expression"))
361}
362
363fn fail_deserialize_opaque_expression<'de, D>(
364    _deserializer: D,
365) -> Result<OpaqueExpression, D::Error>
366where
367    D: Deserializer<'de>,
368{
369    Err(de::Error::custom("Cannot deserialize an Opaque Expression"))
370}
371
372/// A SQL expression.
373///
374/// These expressions do not track or validate data types, other than the type
375/// of literals. It is up to the expression evaluator to validate the
376/// expression against a schema and add appropriate casts as required.
377#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
378pub enum Expression {
379    /// A literal value.
380    Literal(Scalar),
381    /// A column reference by name.
382    Column(ColumnName),
383    /// A predicate treated as a boolean expression
384    Predicate(Box<Predicate>), // should this be Arc?
385    /// A struct computed from a Vec of expressions.
386    /// The optional nullability predicate, if provided and evaluates to false/null, makes the
387    /// entire struct null.
388    Struct(Vec<ExpressionRef>, Option<ExpressionRef>),
389    /// A sparse patch of a struct. More efficient than `Struct` for wide schemas
390    /// where only a few fields change, achieving O(changes) instead of O(schema_width) complexity.
391    #[serde(alias = "Transform")]
392    StructPatch(ExpressionStructPatch),
393    /// An expression that takes one expression as input.
394    Unary(UnaryExpression),
395    /// An expression that takes two expressions as input.
396    Binary(BinaryExpression),
397    /// An expression that takes a variable number of expressions as input.
398    Variadic(VariadicExpression),
399    /// An expression that the engine defines and implements. Kernel interacts with the expression
400    /// only through methods provided by the [`OpaqueExpressionOp`] trait.
401    #[serde(serialize_with = "fail_serialize_opaque_expression")]
402    #[serde(deserialize_with = "fail_deserialize_opaque_expression")]
403    Opaque(OpaqueExpression),
404    /// An unknown expression (i.e. one that neither kernel nor engine attempts to evaluate). For
405    /// data skipping purposes, kernel treats unknown expressions as if they were literal NULL
406    /// values (which may disable skipping if it "poisons" the predicate), but engines MUST NOT
407    /// attempt to interpret them as NULL when evaluating query filters because it could produce
408    /// incorrect results. For example, converting `WHERE <fancy-udf-invocation> IS NULL` to `WHERE
409    /// <unknown> IS NULL` to `WHERE NULL IS NULL` is equivalent to `WHERE TRUE` and would include
410    /// all rows -- almost certainly NOT what the query author intended. Use `Expression::Opaque`
411    /// for expressions kernel doesn't understand but which engine can still evaluate.
412    Unknown(String),
413    /// Parse a JSON string expression into a struct with the given schema.
414    ParseJson(ParseJsonExpression),
415    /// Extract keys from a `Map<String, String>` and parse values into a typed struct using
416    /// Delta's partition value serialization rules.
417    MapToStruct(MapToStructExpression),
418}
419
420/// A SQL predicate.
421///
422/// These predicates do not track or validate data types, other than the type
423/// of literals. It is up to the predicate evaluator to validate the
424/// predicate against a schema and add appropriate casts as required.
425#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
426pub enum Predicate {
427    /// A boolean-valued expression, useful for e.g. `AND(<boolean_col1>, <boolean_col2>)`.
428    BooleanExpression(Expression),
429    /// Boolean inversion (true <-> false)
430    ///
431    /// NOTE: NOT is not a normal unary predicate, because it requires a predicate as input (not an
432    /// expression), and is never directly evaluated. Instead, observing that all predicates are
433    /// invertible, NOT is always pushed down into its child predicate, inverting it. For example,
434    /// `NOT (a < b)` pushes down and inverts `<` to `>=`, producing `a >= b`.
435    Not(Box<Predicate>),
436    /// A unary operation.
437    Unary(UnaryPredicate),
438    /// A binary operation.
439    Binary(BinaryPredicate),
440    /// A junction operation (AND/OR).
441    Junction(JunctionPredicate),
442    /// A predicate that the engine defines and implements. Kernel interacts with the predicate
443    /// only through methods provided by the [`OpaquePredicateOp`] trait.
444    #[serde(serialize_with = "fail_serialize_opaque_predicate")]
445    #[serde(deserialize_with = "fail_deserialize_opaque_predicate")]
446    Opaque(OpaquePredicate),
447    /// An unknown predicate (i.e. one that neither kernel nor engine attempts to evaluate). For
448    /// data skipping purposes, kernel treats unknown predicates as if they were literal NULL
449    /// values (which may disable skipping if it "poisons" the predicate), but engines MUST NOT
450    /// attempt to interpret them as NULL when evaluating query filters because it could
451    /// produce incorrect results. For example, converting `WHERE <fancy-udf-invocation>` to
452    /// `WHERE NULL` is equivalent to `WHERE FALSE` and would filter out all rows -- almost
453    /// certainly NOT what the query author intended. Use `Predicate::Opaque` for predicates
454    /// kernel doesn't understand but which engine can still evaluate.
455    Unknown(String),
456}
457
458////////////////////////////////////////////////////////////////////////
459// Struct/Enum impls
460////////////////////////////////////////////////////////////////////////
461
462impl BinaryPredicateOp {
463    /// True if this is a comparison for which NULL input always produces NULL output
464    pub(crate) fn is_null_intolerant(&self) -> bool {
465        use BinaryPredicateOp::*;
466        match self {
467            LessThan | GreaterThan | Equal => true,
468            Distinct | In => false, // tolerates NULL input
469        }
470    }
471}
472
473impl JunctionPredicateOp {
474    pub(crate) fn invert(&self) -> JunctionPredicateOp {
475        use JunctionPredicateOp::*;
476        match self {
477            And => Or,
478            Or => And,
479        }
480    }
481}
482
483impl UnaryExpression {
484    pub(crate) fn new(op: UnaryExpressionOp, expr: impl Into<Expression>) -> Self {
485        let expr = Box::new(expr.into());
486        Self { op, expr }
487    }
488}
489
490impl UnaryPredicate {
491    pub(crate) fn new(op: UnaryPredicateOp, expr: impl Into<Expression>) -> Self {
492        let expr = Box::new(expr.into());
493        Self { op, expr }
494    }
495}
496
497impl BinaryExpression {
498    pub(crate) fn new(
499        op: BinaryExpressionOp,
500        left: impl Into<Expression>,
501        right: impl Into<Expression>,
502    ) -> Self {
503        let left = Box::new(left.into());
504        let right = Box::new(right.into());
505        Self { op, left, right }
506    }
507}
508
509impl BinaryPredicate {
510    pub(crate) fn new(
511        op: BinaryPredicateOp,
512        left: impl Into<Expression>,
513        right: impl Into<Expression>,
514    ) -> Self {
515        let left = Box::new(left.into());
516        let right = Box::new(right.into());
517        Self { op, left, right }
518    }
519}
520
521impl VariadicExpression {
522    pub(crate) fn new(
523        op: VariadicExpressionOp,
524        exprs: impl IntoIterator<Item = impl Into<Expression>>,
525    ) -> Self {
526        let exprs = exprs.into_iter().map(Into::into).collect();
527        Self { op, exprs }
528    }
529}
530
531impl ParseJsonExpression {
532    pub(crate) fn new(json_expr: impl Into<Expression>, output_schema: SchemaRef) -> Self {
533        Self {
534            json_expr: Box::new(json_expr.into()),
535            output_schema,
536        }
537    }
538}
539
540/// Transforms a `Map<String, String>` column into a struct whose schema is provided by the
541/// evaluator's output type (via `result_type`). Each row in the map column becomes one row in
542/// the output struct column: a `key` -> `value` mapping in the map means the struct field named
543/// `key` receives `value`, parsed into the field's target type using Delta's partition value
544/// serialization rules ([`PrimitiveType::parse_scalar`]).
545///
546/// - Missing keys produce null values
547/// - Parse errors are propagated (indicating a broken table)
548/// - Duplicate map keys are resolved by taking the rightmost entry
549///
550/// [`PrimitiveType::parse_scalar`]: crate::schema::PrimitiveType::parse_scalar
551#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
552pub struct MapToStructExpression {
553    /// The expression that evaluates to a `Map<String, String>` column.
554    pub map_expr: Box<Expression>,
555}
556
557impl MapToStructExpression {
558    pub(crate) fn new(map_expr: impl Into<Expression>) -> Self {
559        Self {
560            map_expr: Box::new(map_expr.into()),
561        }
562    }
563}
564
565impl JunctionPredicate {
566    pub(crate) fn new(op: JunctionPredicateOp, preds: Vec<Predicate>) -> Self {
567        Self { op, preds }
568    }
569}
570
571impl Expression {
572    /// Returns a set of columns referenced by this expression.
573    pub fn references(&self) -> HashSet<&ColumnName> {
574        let mut references = GetColumnReferences::default();
575        references.transform_expr(self);
576        references.0
577    }
578
579    /// Create a new column name expression from input satisfying `FromIterator for ColumnName`.
580    pub fn column(field_names: impl CollectInto<ColumnName>) -> Expression {
581        ColumnName::new(field_names).into()
582    }
583
584    /// Create a new expression for a literal value
585    pub fn literal(value: impl Into<Scalar>) -> Self {
586        Self::Literal(value.into())
587    }
588
589    /// Creates a NULL literal expression
590    pub const fn null_literal(data_type: DataType) -> Self {
591        Self::Literal(Scalar::Null(data_type))
592    }
593
594    /// Wraps a predicate as a boolean-valued expression
595    pub fn from_pred(value: Predicate) -> Self {
596        match value {
597            Predicate::BooleanExpression(expr) => expr,
598            _ => Self::Predicate(Box::new(value)),
599        }
600    }
601
602    /// Create a new struct expression.
603    ///
604    /// The field names and types are supplied by the caller at evaluation time via the
605    /// `result_type` parameter of the expression evaluator. Use this when the schema is
606    /// always available from external context (e.g. the expression is the top-level output
607    /// of [`crate::ExpressionEvaluator`]).
608    pub fn struct_from(exprs: impl IntoIterator<Item = impl Into<Arc<Self>>>) -> Self {
609        Self::Struct(exprs.into_iter().map(Into::into).collect(), None)
610    }
611
612    /// Create a new struct expression with a nullability predicate.
613    ///
614    /// When the predicate evaluates to false or null for a row, the entire struct is null
615    /// for that row.
616    pub fn struct_with_nullability_from(
617        exprs: impl IntoIterator<Item = impl Into<Arc<Self>>>,
618        nullability_predicate: impl Into<Arc<Self>>,
619    ) -> Self {
620        Self::Struct(
621            exprs.into_iter().map(Into::into).collect(),
622            Some(nullability_predicate.into()),
623        )
624    }
625
626    /// Creates a new struct patch expression from a raw patch or patch builder.
627    ///
628    /// Returns an expression that applies the supplied sparse patch to an input struct. Passing a
629    /// raw [`ExpressionStructPatch`] is infallible; passing an [`ExpressionStructPatchBuilder`]
630    /// validates and lowers the recorded operations before constructing the expression.
631    ///
632    /// # Errors
633    ///
634    /// Returns an error if the supplied patch builder contains conflicting operations.
635    pub fn struct_patch<P>(patch: P) -> DeltaResult<Self>
636    where
637        P: TryInto<ExpressionStructPatch>,
638        Error: From<P::Error>,
639    {
640        Ok(Self::StructPatch(patch.try_into()?))
641    }
642
643    /// Create a new predicate `self IS NULL`
644    pub fn is_null(self) -> Predicate {
645        Predicate::is_null(self)
646    }
647
648    /// Create a new predicate `self IS NOT NULL`
649    pub fn is_not_null(self) -> Predicate {
650        Predicate::is_not_null(self)
651    }
652
653    /// Create a new predicate `self == other`
654    pub fn eq(self, other: impl Into<Self>) -> Predicate {
655        Predicate::eq(self, other)
656    }
657
658    /// Create a new predicate `self != other`
659    pub fn ne(self, other: impl Into<Self>) -> Predicate {
660        Predicate::ne(self, other)
661    }
662
663    /// Create a new predicate `self <= other`
664    pub fn le(self, other: impl Into<Self>) -> Predicate {
665        Predicate::le(self, other)
666    }
667
668    /// Create a new predicate `self < other`
669    pub fn lt(self, other: impl Into<Self>) -> Predicate {
670        Predicate::lt(self, other)
671    }
672
673    /// Create a new predicate `self >= other`
674    pub fn ge(self, other: impl Into<Self>) -> Predicate {
675        Predicate::ge(self, other)
676    }
677
678    /// Create a new predicate `self > other`
679    pub fn gt(self, other: impl Into<Self>) -> Predicate {
680        Predicate::gt(self, other)
681    }
682
683    /// Create a new predicate `DISTINCT(self, other)`
684    pub fn distinct(self, other: impl Into<Self>) -> Predicate {
685        Predicate::distinct(self, other)
686    }
687
688    /// Creates a new unary expression
689    pub fn unary(op: UnaryExpressionOp, expr: impl Into<Expression>) -> Self {
690        Self::Unary(UnaryExpression::new(op, expr))
691    }
692
693    /// Creates a new binary expression lhs OP rhs
694    pub fn binary(
695        op: BinaryExpressionOp,
696        lhs: impl Into<Expression>,
697        rhs: impl Into<Expression>,
698    ) -> Self {
699        Self::Binary(BinaryExpression::new(op, lhs, rhs))
700    }
701
702    /// Creates a new variadic expression
703    pub fn variadic(
704        op: VariadicExpressionOp,
705        exprs: impl IntoIterator<Item = impl Into<Expression>>,
706    ) -> Self {
707        Self::Variadic(VariadicExpression::new(op, exprs))
708    }
709
710    /// Creates a new COALESCE expression that returns the first non-null value.
711    ///
712    /// COALESCE evaluates expressions in order and returns the first non-null result.
713    /// If all expressions evaluate to null, the result is null.
714    pub fn coalesce(exprs: impl IntoIterator<Item = impl Into<Expression>>) -> Self {
715        Self::variadic(VariadicExpressionOp::Coalesce, exprs)
716    }
717
718    /// Creates a new Array constructor expression. See [`VariadicExpressionOp::Array`].
719    pub fn array(exprs: impl IntoIterator<Item = impl Into<Expression>>) -> Self {
720        Self::variadic(VariadicExpressionOp::Array, exprs)
721    }
722
723    /// Creates a new opaque expression
724    pub fn opaque(
725        op: impl OpaqueExpressionOp,
726        exprs: impl IntoIterator<Item = Expression>,
727    ) -> Self {
728        Self::Opaque(OpaqueExpression::new(Arc::new(op), exprs))
729    }
730
731    /// Creates a new unknown expression
732    pub fn unknown(name: impl Into<String>) -> Self {
733        Self::Unknown(name.into())
734    }
735
736    /// Creates a new ParseJson expression that parses a JSON string column into a struct.
737    /// This is the inverse of `ToJson` - it converts a JSON-encoded string into a struct.
738    pub fn parse_json(json_expr: impl Into<Expression>, output_schema: SchemaRef) -> Self {
739        Self::ParseJson(ParseJsonExpression::new(json_expr, output_schema))
740    }
741
742    /// Extracts keys from a `Map<String, String>` and parses values into a typed struct using
743    /// Delta's partition value serialization rules. The output struct schema is determined by the
744    /// evaluator's `result_type`.
745    pub fn map_to_struct(map_expr: impl Into<Expression>) -> Self {
746        Self::MapToStruct(MapToStructExpression::new(map_expr))
747    }
748}
749
750impl Predicate {
751    /// Returns a set of columns referenced by this predicate.
752    pub fn references(&self) -> HashSet<&ColumnName> {
753        let mut references = GetColumnReferences::default();
754        references.transform_pred(self);
755        references.0
756    }
757
758    /// Creates a new boolean column reference. See also [`Expression::column`].
759    pub fn column(field_names: impl CollectInto<ColumnName>) -> Predicate {
760        Self::from_expr(ColumnName::new(field_names))
761    }
762
763    /// Create a new literal boolean value
764    pub const fn literal(value: bool) -> Self {
765        Self::BooleanExpression(Expression::Literal(Scalar::Boolean(value)))
766    }
767
768    /// Creates a NULL literal boolean value
769    pub const fn null_literal() -> Self {
770        Self::BooleanExpression(Expression::Literal(Scalar::Null(DataType::BOOLEAN)))
771    }
772
773    /// Converts a boolean-valued expression into a predicate
774    pub fn from_expr(expr: impl Into<Expression>) -> Self {
775        match expr.into() {
776            Expression::Predicate(p) => *p,
777            expr => Predicate::BooleanExpression(expr),
778        }
779    }
780
781    /// Logical NOT (boolean inversion)
782    pub fn not(pred: impl Into<Self>) -> Self {
783        Self::Not(Box::new(pred.into()))
784    }
785
786    /// Create a new predicate `self IS NULL`
787    pub fn is_null(expr: impl Into<Expression>) -> Predicate {
788        Self::unary(UnaryPredicateOp::IsNull, expr)
789    }
790
791    /// Create a new predicate `self IS NOT NULL`
792    pub fn is_not_null(expr: impl Into<Expression>) -> Predicate {
793        Self::not(Self::is_null(expr))
794    }
795
796    /// Create a new predicate `self == other`
797    pub fn eq(a: impl Into<Expression>, b: impl Into<Expression>) -> Self {
798        Self::binary(BinaryPredicateOp::Equal, a, b)
799    }
800
801    /// Create a new predicate `self != other`
802    pub fn ne(a: impl Into<Expression>, b: impl Into<Expression>) -> Self {
803        Self::not(Self::binary(BinaryPredicateOp::Equal, a, b))
804    }
805
806    /// Create a new predicate `self <= other`
807    pub fn le(a: impl Into<Expression>, b: impl Into<Expression>) -> Self {
808        Self::not(Self::binary(BinaryPredicateOp::GreaterThan, a, b))
809    }
810
811    /// Create a new predicate `self < other`
812    pub fn lt(a: impl Into<Expression>, b: impl Into<Expression>) -> Self {
813        Self::binary(BinaryPredicateOp::LessThan, a, b)
814    }
815
816    /// Create a new predicate `self >= other`
817    pub fn ge(a: impl Into<Expression>, b: impl Into<Expression>) -> Self {
818        Self::not(Self::binary(BinaryPredicateOp::LessThan, a, b))
819    }
820
821    /// Create a new predicate `self > other`
822    pub fn gt(a: impl Into<Expression>, b: impl Into<Expression>) -> Self {
823        Self::binary(BinaryPredicateOp::GreaterThan, a, b)
824    }
825
826    /// Create a new predicate `DISTINCT(self, other)`
827    pub fn distinct(a: impl Into<Expression>, b: impl Into<Expression>) -> Self {
828        Self::binary(BinaryPredicateOp::Distinct, a, b)
829    }
830
831    /// Create a new predicate `self AND other`
832    pub fn and(a: impl Into<Self>, b: impl Into<Self>) -> Self {
833        Self::and_from([a.into(), b.into()])
834    }
835
836    /// Create a new predicate `self OR other`
837    pub fn or(a: impl Into<Self>, b: impl Into<Self>) -> Self {
838        Self::or_from([a.into(), b.into()])
839    }
840
841    /// Creates a new predicate AND(preds...). See [`Self::junction`] for normalization of
842    /// empty and single-element inputs.
843    pub fn and_from(preds: impl IntoIterator<Item = Self>) -> Self {
844        Self::junction(JunctionPredicateOp::And, preds)
845    }
846
847    /// Creates a new predicate OR(preds...). See [`Self::junction`] for normalization of
848    /// empty and single-element inputs.
849    pub fn or_from(preds: impl IntoIterator<Item = Self>) -> Self {
850        Self::junction(JunctionPredicateOp::Or, preds)
851    }
852
853    /// Creates a new unary predicate OP expr
854    pub fn unary(op: UnaryPredicateOp, expr: impl Into<Expression>) -> Self {
855        let expr = Box::new(expr.into());
856        Self::Unary(UnaryPredicate { op, expr })
857    }
858
859    /// Creates a new binary predicate lhs OP rhs
860    pub fn binary(
861        op: BinaryPredicateOp,
862        lhs: impl Into<Expression>,
863        rhs: impl Into<Expression>,
864    ) -> Self {
865        Self::Binary(BinaryPredicate {
866            op,
867            left: Box::new(lhs.into()),
868            right: Box::new(rhs.into()),
869        })
870    }
871
872    /// Creates a new junction predicate OP(preds...). Normalizes degenerate cases:
873    ///
874    /// - Empty junction returns the identity element (the value that has no effect when combined
875    ///   with other predicates under the same operator):
876    ///   - `AND()` -> `true`, because `true AND p` == `p` for any predicate `p`.
877    ///   - `OR()` -> `false`, because `false OR p` == `p` for any predicate `p`.
878    /// - Single-element junction unwraps the element: `AND(p)` / `OR(p)` -> `p`.
879    pub fn junction(op: JunctionPredicateOp, preds: impl IntoIterator<Item = Self>) -> Self {
880        let mut preds: Vec<_> = preds.into_iter().collect();
881        match preds.len() {
882            0 => match op {
883                JunctionPredicateOp::And => Self::literal(true),
884                JunctionPredicateOp::Or => Self::literal(false),
885            },
886            // A junction of one predicate is just that predicate.
887            1 => preds.remove(0),
888            _ => Self::Junction(JunctionPredicate { op, preds }),
889        }
890    }
891
892    /// Creates a new opaque predicate
893    pub fn opaque(op: impl OpaquePredicateOp, exprs: impl IntoIterator<Item = Expression>) -> Self {
894        Self::Opaque(OpaquePredicate::new(Arc::new(op), exprs))
895    }
896
897    /// Creates a new unknown predicate
898    pub fn unknown(name: impl Into<String>) -> Self {
899        Self::Unknown(name.into())
900    }
901}
902
903////////////////////////////////////////////////////////////////////////
904// Trait impls
905////////////////////////////////////////////////////////////////////////
906
907impl PartialEq for OpaquePredicate {
908    fn eq(&self, other: &Self) -> bool {
909        self.op.dyn_eq(other.op.any_ref()) && self.exprs == other.exprs
910    }
911}
912
913impl PartialEq for OpaqueExpression {
914    fn eq(&self, other: &Self) -> bool {
915        self.op.dyn_eq(other.op.any_ref()) && self.exprs == other.exprs
916    }
917}
918
919impl Display for UnaryExpressionOp {
920    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
921        use UnaryExpressionOp::*;
922        match self {
923            ToJson => write!(f, "TO_JSON"),
924        }
925    }
926}
927
928impl Display for BinaryExpressionOp {
929    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
930        use BinaryExpressionOp::*;
931        match self {
932            Plus => write!(f, "+"),
933            Minus => write!(f, "-"),
934            Multiply => write!(f, "*"),
935            Divide => write!(f, "/"),
936        }
937    }
938}
939
940impl Display for VariadicExpressionOp {
941    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
942        use VariadicExpressionOp::*;
943        match self {
944            Coalesce => write!(f, "COALESCE"),
945            Array => write!(f, "ARRAY"),
946        }
947    }
948}
949
950impl Display for BinaryPredicateOp {
951    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
952        use BinaryPredicateOp::*;
953        match self {
954            LessThan => write!(f, "<"),
955            GreaterThan => write!(f, ">"),
956            Equal => write!(f, "="),
957            // TODO(roeap): AFAIK DISTINCT does not have a commonly used operator symbol
958            // so ideally this would not be used as we use Display for rendering expressions
959            // in our code we take care of this, but theirs might not ...
960            Distinct => write!(f, "DISTINCT"),
961            In => write!(f, "IN"),
962        }
963    }
964}
965
966// Helper for displaying the children of variadic expressions and predicates
967fn format_child_list<T: Display>(children: &[T]) -> String {
968    children.iter().map(|c| format!("{c}")).join(", ")
969}
970
971impl Display for Expression {
972    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
973        use Expression::*;
974        match self {
975            Literal(l) => write!(f, "{l}"),
976            Column(name) => write!(f, "Column({name})"),
977            Predicate(p) => write!(f, "{p}"),
978            Struct(exprs, _) => write!(f, "Struct({})", format_child_list(exprs)),
979            StructPatch(patch) => {
980                write!(f, "StructPatch(")?;
981                let mut sep = "";
982                if !patch.prepended_fields.is_empty() {
983                    let prepended_fields = format_child_list(&patch.prepended_fields);
984                    write!(f, "prepend [{prepended_fields}]")?;
985                    sep = ", ";
986                }
987                for (field_name, field_patch) in &patch.field_patches {
988                    if !field_patch.keep_input && field_patch.insertions.is_empty() {
989                        write!(f, "{sep}drop {field_name}")?;
990                        sep = ", ";
991                    }
992                    if !field_patch.insertions.is_empty() {
993                        let insertions = format_child_list(&field_patch.insertions);
994                        let action = if field_patch.keep_input {
995                            "after"
996                        } else {
997                            "replace/after"
998                        };
999                        write!(f, "{sep}{action} {field_name} insert [{insertions}]")?;
1000                        sep = ", ";
1001                    }
1002                }
1003                if !patch.appended_fields.is_empty() {
1004                    let appended_fields = format_child_list(&patch.appended_fields);
1005                    write!(f, "{sep}append [{appended_fields}]")?;
1006                }
1007                write!(f, ")")
1008            }
1009            Unary(UnaryExpression { op, expr }) => write!(f, "{op}({expr})"),
1010            Binary(BinaryExpression { op, left, right }) => write!(f, "{left} {op} {right}"),
1011            Variadic(VariadicExpression { op, exprs }) => {
1012                write!(f, "{op}({})", format_child_list(exprs))
1013            }
1014            Opaque(OpaqueExpression { op, exprs }) => {
1015                write!(f, "{op:?}({})", format_child_list(exprs))
1016            }
1017            Unknown(name) => write!(f, "<unknown: {name}>"),
1018            ParseJson(p) => {
1019                write!(
1020                    f,
1021                    "PARSE_JSON({}, <schema:{} fields>)",
1022                    p.json_expr,
1023                    p.output_schema.fields().len()
1024                )
1025            }
1026            MapToStruct(m) => write!(f, "MAP_TO_STRUCT({})", m.map_expr),
1027        }
1028    }
1029}
1030
1031impl Display for Predicate {
1032    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1033        use Predicate::*;
1034        match self {
1035            BooleanExpression(expr) => write!(f, "{expr}"),
1036            Not(pred) => write!(f, "NOT({pred})"),
1037            Binary(BinaryPredicate {
1038                op: BinaryPredicateOp::Distinct,
1039                left,
1040                right,
1041            }) => write!(f, "DISTINCT({left}, {right})"),
1042            Binary(BinaryPredicate { op, left, right }) => write!(f, "{left} {op} {right}"),
1043            Unary(UnaryPredicate { op, expr }) => match op {
1044                UnaryPredicateOp::IsNull => write!(f, "{expr} IS NULL"),
1045            },
1046            Junction(JunctionPredicate { op, preds }) => {
1047                let op = match op {
1048                    JunctionPredicateOp::And => "AND",
1049                    JunctionPredicateOp::Or => "OR",
1050                };
1051                write!(f, "{op}({})", format_child_list(preds))
1052            }
1053            Opaque(OpaquePredicate { op, exprs }) => {
1054                write!(f, "{op:?}({})", format_child_list(exprs))
1055            }
1056            Unknown(name) => write!(f, "<unknown: {name}>"),
1057        }
1058    }
1059}
1060
1061impl From<Scalar> for Expression {
1062    fn from(value: Scalar) -> Self {
1063        Self::literal(value)
1064    }
1065}
1066
1067impl From<ColumnName> for Expression {
1068    fn from(value: ColumnName) -> Self {
1069        Self::Column(value)
1070    }
1071}
1072
1073impl From<Predicate> for Expression {
1074    fn from(value: Predicate) -> Self {
1075        Self::from_pred(value)
1076    }
1077}
1078
1079impl From<ColumnName> for Predicate {
1080    fn from(value: ColumnName) -> Self {
1081        Self::from_expr(value)
1082    }
1083}
1084
1085impl<R: Into<Expression>> std::ops::Add<R> for Expression {
1086    type Output = Self;
1087
1088    fn add(self, rhs: R) -> Self::Output {
1089        Self::binary(BinaryExpressionOp::Plus, self, rhs)
1090    }
1091}
1092
1093impl<R: Into<Expression>> std::ops::Sub<R> for Expression {
1094    type Output = Self;
1095
1096    fn sub(self, rhs: R) -> Self {
1097        Self::binary(BinaryExpressionOp::Minus, self, rhs)
1098    }
1099}
1100
1101impl<R: Into<Expression>> std::ops::Mul<R> for Expression {
1102    type Output = Self;
1103
1104    fn mul(self, rhs: R) -> Self {
1105        Self::binary(BinaryExpressionOp::Multiply, self, rhs)
1106    }
1107}
1108
1109impl<R: Into<Expression>> std::ops::Div<R> for Expression {
1110    type Output = Self;
1111
1112    fn div(self, rhs: R) -> Self {
1113        Self::binary(BinaryExpressionOp::Divide, self, rhs)
1114    }
1115}
1116
1117/// Retrieves the set of column names referenced by an expression.
1118#[derive(Default)]
1119struct GetColumnReferences<'a>(HashSet<&'a ColumnName>);
1120
1121impl<'a> ExpressionTransform<'a> for GetColumnReferences<'a> {
1122    transform_output_type!(|'a, T| ());
1123
1124    fn transform_expr_column(&mut self, name: &'a ColumnName) {
1125        self.0.insert(name);
1126    }
1127}
1128
1129#[cfg(test)]
1130mod tests {
1131    use std::fmt::Debug;
1132
1133    use serde::de::DeserializeOwned;
1134    use serde::Serialize;
1135
1136    use super::{column_expr, column_pred, Expression as Expr, Predicate as Pred};
1137
1138    /// Helper function to verify roundtrip serialization/deserialization
1139    fn assert_roundtrip<T: Serialize + DeserializeOwned + PartialEq + Debug>(value: &T) {
1140        let json = serde_json::to_string(value).expect("serialization should succeed");
1141        let deserialized: T = serde_json::from_str(&json).expect("deserialization should succeed");
1142        assert_eq!(value, &deserialized, "roundtrip should preserve value");
1143    }
1144
1145    #[test]
1146    fn test_expression_format() {
1147        let cases = [
1148            (column_expr!("x"), "Column(x)"),
1149            (
1150                (column_expr!("x") + Expr::literal(4)) / Expr::literal(10) * Expr::literal(42),
1151                "Column(x) + 4 / 10 * 42",
1152            ),
1153            (
1154                Expr::struct_from([column_expr!("x"), Expr::literal(2), Expr::literal(10)]),
1155                "Struct(Column(x), 2, 10)",
1156            ),
1157            (
1158                Expr::array([column_expr!("x"), column_expr!("y"), Expr::literal(0)]),
1159                "ARRAY(Column(x), Column(y), 0)",
1160            ),
1161        ];
1162
1163        for (expr, expected) in cases {
1164            let result = format!("{expr}");
1165            assert_eq!(result, expected);
1166        }
1167    }
1168
1169    #[test]
1170    fn test_predicate_format() {
1171        let cases = [
1172            (column_pred!("x"), "Column(x)"),
1173            (column_expr!("x").eq(Expr::literal(2)), "Column(x) = 2"),
1174            (
1175                (column_expr!("x") - Expr::literal(4)).lt(Expr::literal(10)),
1176                "Column(x) - 4 < 10",
1177            ),
1178            (
1179                Pred::and(
1180                    column_expr!("x").ge(Expr::literal(2)),
1181                    column_expr!("x").le(Expr::literal(10)),
1182                ),
1183                "AND(NOT(Column(x) < 2), NOT(Column(x) > 10))",
1184            ),
1185            (
1186                Pred::and_from([
1187                    column_expr!("x").ge(Expr::literal(2)),
1188                    column_expr!("x").le(Expr::literal(10)),
1189                    column_expr!("x").le(Expr::literal(100)),
1190                ]),
1191                "AND(NOT(Column(x) < 2), NOT(Column(x) > 10), NOT(Column(x) > 100))",
1192            ),
1193            (
1194                Pred::or(
1195                    column_expr!("x").gt(Expr::literal(2)),
1196                    column_expr!("x").lt(Expr::literal(10)),
1197                ),
1198                "OR(Column(x) > 2, Column(x) < 10)",
1199            ),
1200            (
1201                column_expr!("x").eq(Expr::literal("foo")),
1202                "Column(x) = 'foo'",
1203            ),
1204        ];
1205
1206        for (pred, expected) in cases {
1207            let result = format!("{pred}");
1208            assert_eq!(result, expected);
1209        }
1210    }
1211
1212    // ==================== Serde Roundtrip Tests ====================
1213
1214    mod serde_tests {
1215        use std::sync::Arc;
1216
1217        use super::assert_roundtrip;
1218        use crate::expressions::scalars::{ArrayData, DecimalData, MapData, StructData};
1219        use crate::expressions::{
1220            col, column_expr, column_name, lit, BinaryExpressionOp, BinaryPredicateOp, ColumnName,
1221            Expression, ExpressionStructPatchBuilder, Predicate, Scalar, UnaryExpressionOp,
1222        };
1223        use crate::schema::{ArrayType, DataType, DecimalType, MapType, StructField};
1224        use crate::utils::test_utils::assert_result_error_with_message;
1225
1226        // ==================== Expression::Literal Tests ====================
1227
1228        #[test]
1229        fn test_literal_scalars_roundtrip() {
1230            // Test all primitive scalar types that have proper PartialEq
1231            let cases: Vec<Expression> = vec![
1232                // Numeric types
1233                Expression::literal(42i32),         // Integer
1234                Expression::literal(9999999999i64), // Long
1235                Expression::literal(123i16),        // Short
1236                Expression::literal(42i8),          // Byte
1237                Expression::literal(1.12345677_32), // Float
1238                Expression::literal(1.12345667_64), // Double
1239                // String and Boolean
1240                Expression::literal("hello world"),
1241                Expression::literal(true),
1242                Expression::literal(false),
1243                // Temporal types
1244                Expression::Literal(Scalar::Timestamp(1234567890000000)),
1245                Expression::Literal(Scalar::TimestampNtz(1234567890000000)),
1246                Expression::Literal(Scalar::Date(19000)),
1247                // Binary
1248                Expression::Literal(Scalar::Binary(vec![1, 2, 3, 4, 5])),
1249                // Decimal
1250                Expression::Literal(Scalar::Decimal(
1251                    DecimalData::try_new(12345i128, DecimalType::try_new(10, 2).unwrap()).unwrap(),
1252                )),
1253            ];
1254
1255            for expr in &cases {
1256                assert_roundtrip(expr);
1257            }
1258        }
1259
1260        #[test]
1261        fn test_literal_complex_scalars_roundtrip() {
1262            // Test complex scalar types that need JSON comparison (partial_cmp returns None)
1263            let cases: Vec<Expression> = vec![
1264                // Null with different types
1265                Expression::null_literal(DataType::INTEGER),
1266                Expression::null_literal(DataType::STRING),
1267                Expression::null_literal(DataType::BOOLEAN),
1268                // Array
1269                Expression::Literal(Scalar::Array(
1270                    ArrayData::try_new(
1271                        ArrayType::new(DataType::INTEGER, false),
1272                        vec![Scalar::Integer(1), Scalar::Integer(2), Scalar::Integer(3)],
1273                    )
1274                    .unwrap(),
1275                )),
1276                // Map
1277                Expression::Literal(Scalar::Map(
1278                    MapData::try_new(
1279                        MapType::new(DataType::STRING, DataType::INTEGER, false),
1280                        vec![
1281                            (Scalar::String("a".to_string()), Scalar::Integer(1)),
1282                            (Scalar::String("b".to_string()), Scalar::Integer(2)),
1283                        ],
1284                    )
1285                    .unwrap(),
1286                )),
1287                // Struct
1288                Expression::Literal(Scalar::Struct(
1289                    StructData::try_new(
1290                        vec![
1291                            StructField::nullable("x", DataType::INTEGER),
1292                            StructField::nullable("y", DataType::STRING),
1293                        ],
1294                        vec![Scalar::Integer(42), Scalar::String("hello".to_string())],
1295                    )
1296                    .unwrap(),
1297                )),
1298            ];
1299
1300            for expr in &cases {
1301                assert_roundtrip(expr);
1302            }
1303        }
1304
1305        // ==================== Expression::Column Tests ====================
1306
1307        #[test]
1308        fn test_column_expressions_roundtrip() {
1309            let cases: Vec<Expression> = vec![
1310                column_expr!("my_column"),
1311                Expression::column(["parent", "child"]),
1312                Expression::column(["a", "b", "c", "d"]),
1313            ];
1314
1315            for expr in &cases {
1316                assert_roundtrip(expr);
1317            }
1318        }
1319
1320        #[test]
1321        fn test_column_names_roundtrip() {
1322            let cases: Vec<ColumnName> = vec![
1323                column_name!("simple"),
1324                ColumnName::new(["a", "b", "c"]),
1325                ColumnName::default(),
1326            ];
1327
1328            for col in &cases {
1329                assert_roundtrip(col);
1330            }
1331        }
1332
1333        // ==================== Expression Operations Tests ====================
1334
1335        #[test]
1336        fn test_unary_expression_roundtrip() {
1337            let expr = Expression::unary(UnaryExpressionOp::ToJson, column_expr!("data"));
1338            assert_roundtrip(&expr);
1339        }
1340
1341        #[test]
1342        fn test_binary_expressions_roundtrip() {
1343            let ops = [
1344                BinaryExpressionOp::Plus,
1345                BinaryExpressionOp::Minus,
1346                BinaryExpressionOp::Multiply,
1347                BinaryExpressionOp::Divide,
1348            ];
1349
1350            for op in ops {
1351                let expr = Expression::binary(op, column_expr!("a"), Expression::literal(10));
1352                assert_roundtrip(&expr);
1353            }
1354        }
1355
1356        #[test]
1357        fn test_variadic_expression_roundtrip() {
1358            let expr = Expression::coalesce([
1359                column_expr!("a"),
1360                column_expr!("b"),
1361                Expression::literal("default"),
1362            ]);
1363            assert_roundtrip(&expr);
1364        }
1365
1366        #[rstest::rstest]
1367        #[case::array_single(Expression::array([Expression::literal(7i32)]))]
1368        #[case::array_mixed(Expression::array([
1369            column_expr!("a"),
1370            column_expr!("b"),
1371            Expression::literal(42i64),
1372        ]))]
1373        fn test_array_expression_roundtrip(#[case] expr: Expression) {
1374            assert_roundtrip(&expr);
1375        }
1376
1377        #[test]
1378        fn test_nested_arithmetic_expression_roundtrip() {
1379            // (a + b) * (c - d) / 2
1380            let left = Expression::binary(
1381                BinaryExpressionOp::Plus,
1382                column_expr!("a"),
1383                column_expr!("b"),
1384            );
1385            let right = Expression::binary(
1386                BinaryExpressionOp::Minus,
1387                column_expr!("c"),
1388                column_expr!("d"),
1389            );
1390            let mul = Expression::binary(BinaryExpressionOp::Multiply, left, right);
1391            let expr = Expression::binary(BinaryExpressionOp::Divide, mul, Expression::literal(2));
1392            assert_roundtrip(&expr);
1393        }
1394
1395        // ==================== Expression::Struct/StructPatch/Other Tests ====================
1396
1397        #[test]
1398        fn test_struct_expression_roundtrip() {
1399            let expr = Expression::struct_from([
1400                Arc::new(column_expr!("x")),
1401                Arc::new(Expression::literal(42)),
1402                Arc::new(Expression::literal("hello")),
1403            ]);
1404            assert_roundtrip(&expr);
1405        }
1406
1407        #[test]
1408        fn test_transform_expressions_roundtrip() {
1409            let cases: Vec<Expression> = vec![
1410                // Identity transform
1411                Expression::struct_patch(ExpressionStructPatchBuilder::new()).unwrap(),
1412                // Drop field
1413                Expression::struct_patch(ExpressionStructPatchBuilder::new().drop("old_column"))
1414                    .unwrap(),
1415                // Replace field
1416                Expression::struct_patch(
1417                    ExpressionStructPatchBuilder::new().replace("original", lit(0)),
1418                )
1419                .unwrap(),
1420                // Insert fields
1421                Expression::struct_patch(
1422                    ExpressionStructPatchBuilder::new()
1423                        .insert_after("after_col", col!("new_col"))
1424                        .prepend(lit("prepended"))
1425                        .append(lit("appended")),
1426                )
1427                .unwrap(),
1428                // Nested transform
1429                Expression::struct_patch(
1430                    ExpressionStructPatchBuilder::new_nested(["parent", "child"]).drop("to_drop"),
1431                )
1432                .unwrap(),
1433            ];
1434
1435            for expr in &cases {
1436                assert_roundtrip(expr);
1437            }
1438        }
1439
1440        #[test]
1441        fn test_expression_wrapping_predicate_roundtrip() {
1442            let pred = Predicate::eq(column_expr!("x"), Expression::literal(10));
1443            let expr = Expression::from_pred(pred);
1444            assert_roundtrip(&expr);
1445        }
1446
1447        #[test]
1448        fn test_expression_unknown_roundtrip() {
1449            let expr = Expression::unknown("some_unknown_function()");
1450            assert_roundtrip(&expr);
1451        }
1452
1453        #[test]
1454        fn test_map_to_struct_expression_roundtrip() {
1455            let cases: Vec<Expression> = vec![
1456                Expression::map_to_struct(column_expr!("pv")),
1457                Expression::map_to_struct(Expression::literal("ignored")),
1458            ];
1459
1460            for expr in &cases {
1461                assert_roundtrip(expr);
1462            }
1463        }
1464
1465        // ==================== Predicate Tests ====================
1466
1467        #[test]
1468        fn test_predicate_basics_roundtrip() {
1469            let cases: Vec<Predicate> = vec![
1470                // Boolean expression
1471                Predicate::from_expr(column_expr!("is_active")),
1472                // Literals
1473                Predicate::literal(true),
1474                Predicate::literal(false),
1475                // NOT
1476                Predicate::not(Predicate::from_expr(column_expr!("x"))),
1477                // Nested NOT
1478                Predicate::not(Predicate::not(Predicate::gt(
1479                    column_expr!("x"),
1480                    Expression::literal(5),
1481                ))),
1482                // Unknown
1483                Predicate::unknown("some_unknown_predicate()"),
1484                // Unary predicates
1485                Predicate::is_null(column_expr!("nullable_col")),
1486                Predicate::is_not_null(column_expr!("nullable_col")),
1487            ];
1488
1489            for pred in &cases {
1490                assert_roundtrip(pred);
1491            }
1492        }
1493
1494        #[test]
1495        fn test_predicate_null_literal_roundtrip() {
1496            let pred = Predicate::null_literal();
1497            assert_roundtrip(&pred);
1498        }
1499
1500        #[test]
1501        fn test_predicate_comparisons_roundtrip() {
1502            let cases: Vec<Predicate> = vec![
1503                Predicate::eq(column_expr!("x"), Expression::literal(42)),
1504                Predicate::ne(column_expr!("status"), Expression::literal("active")),
1505                Predicate::lt(column_expr!("age"), Expression::literal(18)),
1506                Predicate::le(column_expr!("price"), Expression::literal(100)),
1507                Predicate::gt(column_expr!("score"), Expression::literal(90)),
1508                Predicate::ge(column_expr!("quantity"), Expression::literal(1)),
1509                Predicate::distinct(column_expr!("a"), column_expr!("b")),
1510            ];
1511
1512            for pred in &cases {
1513                assert_roundtrip(pred);
1514            }
1515        }
1516
1517        #[test]
1518        fn test_predicate_in_roundtrip() {
1519            let array_data = ArrayData::try_new(
1520                ArrayType::new(DataType::INTEGER, false),
1521                vec![Scalar::Integer(1), Scalar::Integer(2), Scalar::Integer(3)],
1522            )
1523            .unwrap();
1524            let pred = Predicate::binary(
1525                BinaryPredicateOp::In,
1526                column_expr!("x"),
1527                Expression::Literal(Scalar::Array(array_data)),
1528            );
1529            assert_roundtrip(&pred);
1530        }
1531
1532        #[test]
1533        fn test_predicate_junctions_roundtrip() {
1534            let cases: Vec<Predicate> = vec![
1535                // Simple AND
1536                Predicate::and(
1537                    Predicate::gt(column_expr!("x"), Expression::literal(0)),
1538                    Predicate::lt(column_expr!("x"), Expression::literal(100)),
1539                ),
1540                // Simple OR
1541                Predicate::or(
1542                    Predicate::eq(column_expr!("status"), Expression::literal("active")),
1543                    Predicate::eq(column_expr!("status"), Expression::literal("pending")),
1544                ),
1545                // Multiple AND
1546                Predicate::and_from([
1547                    Predicate::gt(column_expr!("x"), Expression::literal(0)),
1548                    Predicate::lt(column_expr!("x"), Expression::literal(100)),
1549                    Predicate::is_not_null(column_expr!("x")),
1550                ]),
1551                // Multiple OR
1552                Predicate::or_from([
1553                    Predicate::eq(column_expr!("type"), Expression::literal("A")),
1554                    Predicate::eq(column_expr!("type"), Expression::literal("B")),
1555                    Predicate::eq(column_expr!("type"), Expression::literal("C")),
1556                ]),
1557                // Nested: (a > 0 AND b < 100) OR (c = 'special')
1558                Predicate::or(
1559                    Predicate::and(
1560                        Predicate::gt(column_expr!("a"), Expression::literal(0)),
1561                        Predicate::lt(column_expr!("b"), Expression::literal(100)),
1562                    ),
1563                    Predicate::eq(column_expr!("c"), Expression::literal("special")),
1564                ),
1565            ];
1566
1567            for pred in &cases {
1568                assert_roundtrip(pred);
1569            }
1570        }
1571
1572        // ==================== Complex Nested Structures ====================
1573
1574        #[test]
1575        fn test_deeply_nested_structures_roundtrip() {
1576            // COALESCE(a + b, c * d, 0) > 100
1577            let add = Expression::binary(
1578                BinaryExpressionOp::Plus,
1579                column_expr!("a"),
1580                column_expr!("b"),
1581            );
1582            let mul = Expression::binary(
1583                BinaryExpressionOp::Multiply,
1584                column_expr!("c"),
1585                column_expr!("d"),
1586            );
1587            let coalesce = Expression::coalesce([add, mul, Expression::literal(0)]);
1588            let pred = Predicate::gt(coalesce, Expression::literal(100));
1589            assert_roundtrip(&pred);
1590
1591            // Expression wrapping a predicate that references expressions
1592            let inner_pred = Predicate::and(
1593                Predicate::eq(column_expr!("x"), Expression::literal(1)),
1594                Predicate::gt(
1595                    Expression::binary(
1596                        BinaryExpressionOp::Plus,
1597                        column_expr!("y"),
1598                        column_expr!("z"),
1599                    ),
1600                    Expression::literal(10),
1601                ),
1602            );
1603            let expr = Expression::from_pred(inner_pred);
1604            assert_roundtrip(&expr);
1605        }
1606
1607        // ==================== Opaque Variant Failure Tests ====================
1608
1609        #[test]
1610        fn test_opaque_expression_serialize_fails() {
1611            use crate::expressions::{OpaqueExpressionOp, ScalarExpressionEvaluator};
1612            use crate::DeltaResult;
1613
1614            #[derive(Debug, PartialEq)]
1615            struct TestOpaqueExprOp;
1616
1617            impl OpaqueExpressionOp for TestOpaqueExprOp {
1618                fn name(&self) -> &str {
1619                    "test_opaque"
1620                }
1621                fn eval_expr_scalar(
1622                    &self,
1623                    _eval_expr: &ScalarExpressionEvaluator<'_>,
1624                    _exprs: &[Expression],
1625                ) -> DeltaResult<Scalar> {
1626                    Ok(Scalar::Integer(0))
1627                }
1628            }
1629
1630            let expr = Expression::opaque(TestOpaqueExprOp, [Expression::literal(1)]);
1631            let result = serde_json::to_string(&expr);
1632            assert_result_error_with_message(result, "Cannot serialize an Opaque Expression");
1633        }
1634
1635        #[test]
1636        fn test_opaque_predicate_serialize_fails() {
1637            use crate::expressions::{OpaquePredicateOp, ScalarExpressionEvaluator};
1638            use crate::kernel_predicates::{
1639                DirectDataSkippingPredicateEvaluator, DirectPredicateEvaluator,
1640                IndirectDataSkippingPredicateEvaluator,
1641            };
1642            use crate::DeltaResult;
1643
1644            #[derive(Debug, PartialEq)]
1645            struct TestOpaquePredOp;
1646
1647            impl OpaquePredicateOp for TestOpaquePredOp {
1648                fn name(&self) -> &str {
1649                    "test_opaque_pred"
1650                }
1651                fn eval_pred_scalar(
1652                    &self,
1653                    _eval_expr: &ScalarExpressionEvaluator<'_>,
1654                    _eval_pred: &DirectPredicateEvaluator<'_>,
1655                    _exprs: &[Expression],
1656                    _inverted: bool,
1657                ) -> DeltaResult<Option<bool>> {
1658                    Ok(Some(true))
1659                }
1660                fn eval_as_data_skipping_predicate(
1661                    &self,
1662                    _evaluator: &DirectDataSkippingPredicateEvaluator<'_>,
1663                    _exprs: &[Expression],
1664                    _inverted: bool,
1665                ) -> Option<bool> {
1666                    Some(true)
1667                }
1668                fn as_data_skipping_predicate(
1669                    &self,
1670                    _evaluator: &IndirectDataSkippingPredicateEvaluator<'_>,
1671                    _exprs: &[Expression],
1672                    _inverted: bool,
1673                ) -> Option<Predicate> {
1674                    None
1675                }
1676            }
1677
1678            let pred = Predicate::opaque(TestOpaquePredOp, [Expression::literal(1)]);
1679            let result = serde_json::to_string(&pred);
1680            assert_result_error_with_message(result, "Cannot serialize an Opaque Predicate");
1681        }
1682    }
1683
1684    #[test]
1685    fn single_element_and_from_returns_unwrapped_predicate() {
1686        let inner = Pred::gt(column_expr!("x"), Expr::literal(0));
1687        let result = Pred::and_from([inner.clone()]);
1688        assert_eq!(result, inner);
1689    }
1690
1691    #[test]
1692    fn single_element_or_from_returns_unwrapped_predicate() {
1693        let inner = Pred::gt(column_expr!("x"), Expr::literal(0));
1694        let result = Pred::or_from([inner.clone()]);
1695        assert_eq!(result, inner);
1696    }
1697
1698    #[test]
1699    fn multi_element_and_from_returns_junction() {
1700        let p1 = Pred::gt(column_expr!("x"), Expr::literal(0));
1701        let p2 = Pred::lt(column_expr!("x"), Expr::literal(100));
1702        let result = Pred::and_from([p1.clone(), p2.clone()]);
1703        assert!(matches!(result, Pred::Junction(ref j) if j.preds.len() == 2));
1704        assert_eq!(result, Pred::and(p1, p2));
1705    }
1706
1707    #[test]
1708    fn empty_and_from_returns_identity_literal() {
1709        let result = Pred::and_from(std::iter::empty());
1710        assert_eq!(result, Pred::literal(true));
1711    }
1712
1713    #[test]
1714    fn empty_or_from_returns_identity_literal() {
1715        let result = Pred::or_from(std::iter::empty());
1716        assert_eq!(result, Pred::literal(false));
1717    }
1718}