Skip to main content

datafusion_proto/logical_plan/
from_proto.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use arrow::datatypes::{DataType, Field};
21use datafusion_common::datatype::DataTypeExt;
22use datafusion_common::{
23    NullEquality, RecursionUnnestOption, Result, ScalarValue, TableReference,
24    UnnestOptions, exec_datafusion_err, internal_err, plan_datafusion_err,
25};
26use datafusion_execution::TaskContext;
27use datafusion_execution::registry::FunctionRegistry;
28use datafusion_expr::dml::InsertOp;
29use datafusion_expr::expr::{Alias, NullTreatment, Placeholder, Sort};
30use datafusion_expr::expr::{Unnest, WildcardOptions};
31use datafusion_expr::logical_plan::Subquery;
32use datafusion_expr::{
33    Between, BinaryExpr, Case, Cast, Expr, GroupingSet,
34    GroupingSet::GroupingSets,
35    JoinConstraint, JoinType, Like, Operator, TryCast, WindowFrame, WindowFrameBound,
36    WindowFrameUnits,
37    expr::{self, InList, WindowFunction},
38    logical_plan::{PlanType, StringifiedPlan},
39};
40use datafusion_expr::{ExprFunctionExt, WriteOp};
41use datafusion_proto_common::{FromProtoError as Error, from_proto::FromOptionalField};
42
43use crate::protobuf::plan_type::PlanTypeEnum::{
44    FinalPhysicalPlanWithSchema, InitialPhysicalPlanWithSchema,
45};
46use crate::protobuf::{
47    self, AnalyzedLogicalPlanType, CubeNode, GroupingSetNode, OptimizedLogicalPlanType,
48    OptimizedPhysicalPlanType, PlaceholderNode, RollupNode,
49    plan_type::PlanTypeEnum::{
50        AnalyzedLogicalPlan, FinalAnalyzedLogicalPlan, FinalLogicalPlan,
51        FinalPhysicalPlan, FinalPhysicalPlanWithStats, InitialLogicalPlan,
52        InitialPhysicalPlan, InitialPhysicalPlanWithStats, OptimizedLogicalPlan,
53        OptimizedPhysicalPlan, PhysicalPlanError,
54    },
55};
56
57use super::{AsLogicalPlan, LogicalExtensionCodec};
58
59impl From<&protobuf::UnnestOptions> for UnnestOptions {
60    fn from(opts: &protobuf::UnnestOptions) -> Self {
61        Self {
62            preserve_nulls: opts.preserve_nulls,
63            recursions: opts
64                .recursions
65                .iter()
66                .map(|r| RecursionUnnestOption {
67                    input_column: r.input_column.as_ref().unwrap().into(),
68                    output_column: r.output_column.as_ref().unwrap().into(),
69                    depth: r.depth as usize,
70                })
71                .collect::<Vec<_>>(),
72        }
73    }
74}
75
76impl From<protobuf::WindowFrameUnits> for WindowFrameUnits {
77    fn from(units: protobuf::WindowFrameUnits) -> Self {
78        match units {
79            protobuf::WindowFrameUnits::Rows => Self::Rows,
80            protobuf::WindowFrameUnits::Range => Self::Range,
81            protobuf::WindowFrameUnits::Groups => Self::Groups,
82        }
83    }
84}
85
86impl TryFrom<protobuf::TableReference> for TableReference {
87    type Error = Error;
88
89    fn try_from(value: protobuf::TableReference) -> Result<Self, Self::Error> {
90        use protobuf::table_reference::TableReferenceEnum;
91        let table_reference_enum = value
92            .table_reference_enum
93            .ok_or_else(|| Error::required("table_reference_enum"))?;
94
95        match table_reference_enum {
96            TableReferenceEnum::Bare(protobuf::BareTableReference { table }) => {
97                Ok(TableReference::bare(table))
98            }
99            TableReferenceEnum::Partial(protobuf::PartialTableReference {
100                schema,
101                table,
102            }) => Ok(TableReference::partial(schema, table)),
103            TableReferenceEnum::Full(protobuf::FullTableReference {
104                catalog,
105                schema,
106                table,
107            }) => Ok(TableReference::full(catalog, schema, table)),
108        }
109    }
110}
111
112impl From<&protobuf::StringifiedPlan> for StringifiedPlan {
113    fn from(stringified_plan: &protobuf::StringifiedPlan) -> Self {
114        Self {
115            plan_type: match stringified_plan
116                .plan_type
117                .as_ref()
118                .and_then(|pt| pt.plan_type_enum.as_ref())
119                .unwrap_or_else(|| {
120                    panic!(
121                        "Cannot create protobuf::StringifiedPlan from {stringified_plan:?}"
122                    )
123                }) {
124                InitialLogicalPlan(_) => PlanType::InitialLogicalPlan,
125                AnalyzedLogicalPlan(AnalyzedLogicalPlanType { analyzer_name }) => {
126                    PlanType::AnalyzedLogicalPlan {
127                        analyzer_name:analyzer_name.clone()
128                    }
129                }
130                FinalAnalyzedLogicalPlan(_) => PlanType::FinalAnalyzedLogicalPlan,
131                OptimizedLogicalPlan(OptimizedLogicalPlanType { optimizer_name }) => {
132                    PlanType::OptimizedLogicalPlan {
133                        optimizer_name: optimizer_name.clone(),
134                    }
135                }
136                FinalLogicalPlan(_) => PlanType::FinalLogicalPlan,
137                InitialPhysicalPlan(_) => PlanType::InitialPhysicalPlan,
138                InitialPhysicalPlanWithStats(_) => PlanType::InitialPhysicalPlanWithStats,
139                InitialPhysicalPlanWithSchema(_) => PlanType::InitialPhysicalPlanWithSchema,
140                OptimizedPhysicalPlan(OptimizedPhysicalPlanType { optimizer_name }) => {
141                    PlanType::OptimizedPhysicalPlan {
142                        optimizer_name: optimizer_name.clone(),
143                    }
144                }
145                FinalPhysicalPlan(_) => PlanType::FinalPhysicalPlan,
146                FinalPhysicalPlanWithStats(_) => PlanType::FinalPhysicalPlanWithStats,
147                FinalPhysicalPlanWithSchema(_) => PlanType::FinalPhysicalPlanWithSchema,
148                PhysicalPlanError(_) => PlanType::PhysicalPlanError,
149            },
150            plan: Arc::new(stringified_plan.plan.clone()),
151        }
152    }
153}
154
155impl TryFrom<protobuf::WindowFrame> for WindowFrame {
156    type Error = Error;
157
158    fn try_from(window: protobuf::WindowFrame) -> Result<Self, Self::Error> {
159        let units = protobuf::WindowFrameUnits::try_from(window.window_frame_units)
160            .map_err(|_| Error::unknown("WindowFrameUnits", window.window_frame_units))?
161            .into();
162        let start_bound = window.start_bound.required("start_bound")?;
163        let end_bound = window
164            .end_bound
165            .map(|end_bound| match end_bound {
166                protobuf::window_frame::EndBound::Bound(end_bound) => {
167                    end_bound.try_into()
168                }
169            })
170            .transpose()?
171            .unwrap_or(WindowFrameBound::CurrentRow);
172        Ok(WindowFrame::new_bounds(units, start_bound, end_bound))
173    }
174}
175
176impl TryFrom<protobuf::WindowFrameBound> for WindowFrameBound {
177    type Error = Error;
178
179    fn try_from(bound: protobuf::WindowFrameBound) -> Result<Self, Self::Error> {
180        let bound_type =
181            protobuf::WindowFrameBoundType::try_from(bound.window_frame_bound_type)
182                .map_err(|_| {
183                    Error::unknown("WindowFrameBoundType", bound.window_frame_bound_type)
184                })?;
185        match bound_type {
186            protobuf::WindowFrameBoundType::CurrentRow => Ok(Self::CurrentRow),
187            protobuf::WindowFrameBoundType::Preceding => match bound.bound_value {
188                Some(x) => Ok(Self::Preceding(ScalarValue::try_from(&x)?)),
189                None => Ok(Self::Preceding(ScalarValue::UInt64(None))),
190            },
191            protobuf::WindowFrameBoundType::Following => match bound.bound_value {
192                Some(x) => Ok(Self::Following(ScalarValue::try_from(&x)?)),
193                None => Ok(Self::Following(ScalarValue::UInt64(None))),
194            },
195        }
196    }
197}
198
199impl From<protobuf::JoinType> for JoinType {
200    fn from(t: protobuf::JoinType) -> Self {
201        match t {
202            protobuf::JoinType::Inner => JoinType::Inner,
203            protobuf::JoinType::Left => JoinType::Left,
204            protobuf::JoinType::Right => JoinType::Right,
205            protobuf::JoinType::Full => JoinType::Full,
206            protobuf::JoinType::Leftsemi => JoinType::LeftSemi,
207            protobuf::JoinType::Rightsemi => JoinType::RightSemi,
208            protobuf::JoinType::Leftanti => JoinType::LeftAnti,
209            protobuf::JoinType::Rightanti => JoinType::RightAnti,
210            protobuf::JoinType::Leftmark => JoinType::LeftMark,
211            protobuf::JoinType::Rightmark => JoinType::RightMark,
212        }
213    }
214}
215
216impl From<protobuf::JoinConstraint> for JoinConstraint {
217    fn from(t: protobuf::JoinConstraint) -> Self {
218        match t {
219            protobuf::JoinConstraint::On => JoinConstraint::On,
220            protobuf::JoinConstraint::Using => JoinConstraint::Using,
221        }
222    }
223}
224
225impl From<protobuf::NullEquality> for NullEquality {
226    fn from(t: protobuf::NullEquality) -> Self {
227        match t {
228            protobuf::NullEquality::NullEqualsNothing => NullEquality::NullEqualsNothing,
229            protobuf::NullEquality::NullEqualsNull => NullEquality::NullEqualsNull,
230        }
231    }
232}
233
234impl From<protobuf::dml_node::Type> for WriteOp {
235    fn from(t: protobuf::dml_node::Type) -> Self {
236        match t {
237            protobuf::dml_node::Type::Update => WriteOp::Update,
238            protobuf::dml_node::Type::Delete => WriteOp::Delete,
239            protobuf::dml_node::Type::InsertAppend => WriteOp::Insert(InsertOp::Append),
240            protobuf::dml_node::Type::InsertOverwrite => {
241                WriteOp::Insert(InsertOp::Overwrite)
242            }
243            protobuf::dml_node::Type::InsertReplace => WriteOp::Insert(InsertOp::Replace),
244            protobuf::dml_node::Type::Ctas => WriteOp::Ctas,
245            protobuf::dml_node::Type::Truncate => WriteOp::Truncate,
246        }
247    }
248}
249
250impl From<protobuf::NullTreatment> for NullTreatment {
251    fn from(t: protobuf::NullTreatment) -> Self {
252        match t {
253            protobuf::NullTreatment::RespectNulls => NullTreatment::RespectNulls,
254            protobuf::NullTreatment::IgnoreNulls => NullTreatment::IgnoreNulls,
255        }
256    }
257}
258
259pub fn parse_expr(
260    proto: &protobuf::LogicalExprNode,
261    ctx: &TaskContext,
262    codec: &dyn LogicalExtensionCodec,
263) -> Result<Expr, Error> {
264    use protobuf::{logical_expr_node::ExprType, window_expr_node};
265
266    let expr_type = proto
267        .expr_type
268        .as_ref()
269        .ok_or_else(|| Error::required("expr_type"))?;
270
271    match expr_type {
272        ExprType::BinaryExpr(binary_expr) => {
273            let op = from_proto_binary_op(&binary_expr.op)?;
274            let operands = parse_exprs(&binary_expr.operands, ctx, codec)?;
275
276            if operands.len() < 2 {
277                return Err(proto_error(
278                    "A binary expression must always have at least 2 operands",
279                ));
280            }
281
282            // Reduce the linearized operands (ordered by left innermost to right
283            // outermost) into a single expression tree.
284            Ok(operands
285                .into_iter()
286                .reduce(|left, right| {
287                    Expr::BinaryExpr(BinaryExpr::new(Box::new(left), op, Box::new(right)))
288                })
289                .expect("Binary expression could not be reduced to a single expression."))
290        }
291        ExprType::Column(column) => Ok(Expr::Column(column.into())),
292        ExprType::Literal(literal) => {
293            let scalar_value: ScalarValue = literal.try_into()?;
294            Ok(Expr::Literal(scalar_value, None))
295        }
296        ExprType::WindowExpr(expr) => {
297            let window_function = expr
298                .window_function
299                .as_ref()
300                .ok_or_else(|| Error::required("window_function"))?;
301            let partition_by = parse_exprs(&expr.partition_by, ctx, codec)?;
302            let mut order_by = parse_sorts(&expr.order_by, ctx, codec)?;
303            let window_frame = expr
304                .window_frame
305                .as_ref()
306                .map::<Result<WindowFrame, _>, _>(|window_frame| {
307                    let window_frame: WindowFrame = window_frame.clone().try_into()?;
308                    window_frame
309                        .regularize_order_bys(&mut order_by)
310                        .map(|_| window_frame)
311                })
312                .transpose()?
313                .ok_or_else(|| {
314                    exec_datafusion_err!("missing window frame during deserialization")
315                })?;
316
317            let null_treatment = match expr.null_treatment {
318                Some(null_treatment) => {
319                    let null_treatment  =  protobuf::NullTreatment::try_from(null_treatment)
320                    .map_err(|_| {
321                        proto_error(format!(
322                            "Received a WindowExprNode message with unknown NullTreatment {null_treatment}",
323                        ))
324                    })?;
325                    Some(NullTreatment::from(null_treatment))
326                }
327                None => None,
328            };
329
330            let agg_fn = match window_function {
331                window_expr_node::WindowFunction::Udaf(udaf_name) => {
332                    let udaf_function = match &expr.fun_definition {
333                        Some(buf) => codec.try_decode_udaf(udaf_name, buf)?,
334                        None => ctx
335                            .udaf(udaf_name)
336                            .or_else(|_| codec.try_decode_udaf(udaf_name, &[]))?,
337                    };
338                    expr::WindowFunctionDefinition::AggregateUDF(udaf_function)
339                }
340                window_expr_node::WindowFunction::Udwf(udwf_name) => {
341                    let udwf_function = match &expr.fun_definition {
342                        Some(buf) => codec.try_decode_udwf(udwf_name, buf)?,
343                        None => ctx
344                            .udwf(udwf_name)
345                            .or_else(|_| codec.try_decode_udwf(udwf_name, &[]))?,
346                    };
347                    expr::WindowFunctionDefinition::WindowUDF(udwf_function)
348                }
349            };
350
351            let args = parse_exprs(&expr.exprs, ctx, codec)?;
352            let mut builder = Expr::from(WindowFunction::new(agg_fn, args))
353                .partition_by(partition_by)
354                .order_by(order_by)
355                .window_frame(window_frame)
356                .null_treatment(null_treatment);
357
358            if expr.distinct {
359                builder = builder.distinct();
360            };
361
362            if let Some(filter) = parse_optional_expr(expr.filter.as_deref(), ctx, codec)?
363            {
364                builder = builder.filter(filter);
365            }
366
367            builder.build().map_err(Error::DataFusionError)
368        }
369        ExprType::Alias(alias) => Ok(Expr::Alias(Alias::new(
370            parse_required_expr(alias.expr.as_deref(), ctx, "expr", codec)?,
371            alias
372                .relation
373                .first()
374                .map(|r| TableReference::try_from(r.clone()))
375                .transpose()?,
376            alias.alias.clone(),
377        ))),
378        ExprType::IsNullExpr(is_null) => Ok(Expr::IsNull(Box::new(parse_required_expr(
379            is_null.expr.as_deref(),
380            ctx,
381            "expr",
382            codec,
383        )?))),
384        ExprType::IsNotNullExpr(is_not_null) => Ok(Expr::IsNotNull(Box::new(
385            parse_required_expr(is_not_null.expr.as_deref(), ctx, "expr", codec)?,
386        ))),
387        ExprType::NotExpr(not) => Ok(Expr::Not(Box::new(parse_required_expr(
388            not.expr.as_deref(),
389            ctx,
390            "expr",
391            codec,
392        )?))),
393        ExprType::IsTrue(msg) => Ok(Expr::IsTrue(Box::new(parse_required_expr(
394            msg.expr.as_deref(),
395            ctx,
396            "expr",
397            codec,
398        )?))),
399        ExprType::IsFalse(msg) => Ok(Expr::IsFalse(Box::new(parse_required_expr(
400            msg.expr.as_deref(),
401            ctx,
402            "expr",
403            codec,
404        )?))),
405        ExprType::IsUnknown(msg) => Ok(Expr::IsUnknown(Box::new(parse_required_expr(
406            msg.expr.as_deref(),
407            ctx,
408            "expr",
409            codec,
410        )?))),
411        ExprType::IsNotTrue(msg) => Ok(Expr::IsNotTrue(Box::new(parse_required_expr(
412            msg.expr.as_deref(),
413            ctx,
414            "expr",
415            codec,
416        )?))),
417        ExprType::IsNotFalse(msg) => Ok(Expr::IsNotFalse(Box::new(parse_required_expr(
418            msg.expr.as_deref(),
419            ctx,
420            "expr",
421            codec,
422        )?))),
423        ExprType::IsNotUnknown(msg) => Ok(Expr::IsNotUnknown(Box::new(
424            parse_required_expr(msg.expr.as_deref(), ctx, "expr", codec)?,
425        ))),
426        ExprType::Between(between) => Ok(Expr::Between(Between::new(
427            Box::new(parse_required_expr(
428                between.expr.as_deref(),
429                ctx,
430                "expr",
431                codec,
432            )?),
433            between.negated,
434            Box::new(parse_required_expr(
435                between.low.as_deref(),
436                ctx,
437                "expr",
438                codec,
439            )?),
440            Box::new(parse_required_expr(
441                between.high.as_deref(),
442                ctx,
443                "expr",
444                codec,
445            )?),
446        ))),
447        ExprType::Like(like) => Ok(Expr::Like(Like::new(
448            like.negated,
449            Box::new(parse_required_expr(
450                like.expr.as_deref(),
451                ctx,
452                "expr",
453                codec,
454            )?),
455            Box::new(parse_required_expr(
456                like.pattern.as_deref(),
457                ctx,
458                "pattern",
459                codec,
460            )?),
461            parse_escape_char(&like.escape_char)?,
462            false,
463        ))),
464        ExprType::Ilike(like) => Ok(Expr::Like(Like::new(
465            like.negated,
466            Box::new(parse_required_expr(
467                like.expr.as_deref(),
468                ctx,
469                "expr",
470                codec,
471            )?),
472            Box::new(parse_required_expr(
473                like.pattern.as_deref(),
474                ctx,
475                "pattern",
476                codec,
477            )?),
478            parse_escape_char(&like.escape_char)?,
479            true,
480        ))),
481        ExprType::SimilarTo(like) => Ok(Expr::SimilarTo(Like::new(
482            like.negated,
483            Box::new(parse_required_expr(
484                like.expr.as_deref(),
485                ctx,
486                "expr",
487                codec,
488            )?),
489            Box::new(parse_required_expr(
490                like.pattern.as_deref(),
491                ctx,
492                "pattern",
493                codec,
494            )?),
495            parse_escape_char(&like.escape_char)?,
496            false,
497        ))),
498        ExprType::Case(case) => {
499            let when_then_expr = case
500                .when_then_expr
501                .iter()
502                .map(|e| {
503                    let when_expr = parse_required_expr(
504                        e.when_expr.as_ref(),
505                        ctx,
506                        "when_expr",
507                        codec,
508                    )?;
509                    let then_expr = parse_required_expr(
510                        e.then_expr.as_ref(),
511                        ctx,
512                        "then_expr",
513                        codec,
514                    )?;
515                    Ok((Box::new(when_expr), Box::new(then_expr)))
516                })
517                .collect::<Result<Vec<(Box<Expr>, Box<Expr>)>, Error>>()?;
518            Ok(Expr::Case(Case::new(
519                parse_optional_expr(case.expr.as_deref(), ctx, codec)?.map(Box::new),
520                when_then_expr,
521                parse_optional_expr(case.else_expr.as_deref(), ctx, codec)?.map(Box::new),
522            )))
523        }
524        ExprType::Cast(cast) => {
525            let expr = Box::new(parse_required_expr(
526                cast.expr.as_deref(),
527                ctx,
528                "expr",
529                codec,
530            )?);
531            let data_type: DataType = cast.arrow_type.as_ref().required("arrow_type")?;
532            let field = data_type
533                .into_nullable_field()
534                .with_nullable(cast.nullable.unwrap_or(true));
535            Ok(Expr::Cast(Cast::new_from_field(expr, Arc::new(field))))
536        }
537        ExprType::TryCast(cast) => {
538            let expr = Box::new(parse_required_expr(
539                cast.expr.as_deref(),
540                ctx,
541                "expr",
542                codec,
543            )?);
544            let data_type: DataType = cast.arrow_type.as_ref().required("arrow_type")?;
545            let field = data_type
546                .into_nullable_field()
547                .with_nullable(cast.nullable.unwrap_or(true));
548            Ok(Expr::TryCast(TryCast::new_from_field(
549                expr,
550                Arc::new(field),
551            )))
552        }
553        ExprType::Negative(negative) => Ok(Expr::Negative(Box::new(
554            parse_required_expr(negative.expr.as_deref(), ctx, "expr", codec)?,
555        ))),
556        ExprType::Unnest(unnest) => {
557            let mut exprs = parse_exprs(&unnest.exprs, ctx, codec)?;
558            if exprs.len() != 1 {
559                return Err(proto_error("Unnest must have exactly one expression"));
560            }
561            Ok(Expr::Unnest(Unnest::new(exprs.swap_remove(0))))
562        }
563        ExprType::InList(in_list) => Ok(Expr::InList(InList::new(
564            Box::new(parse_required_expr(
565                in_list.expr.as_deref(),
566                ctx,
567                "expr",
568                codec,
569            )?),
570            parse_exprs(&in_list.list, ctx, codec)?,
571            in_list.negated,
572        ))),
573        ExprType::Wildcard(protobuf::Wildcard { qualifier }) => {
574            let qualifier = qualifier.to_owned().map(|x| x.try_into()).transpose()?;
575            #[expect(deprecated)]
576            Ok(Expr::Wildcard {
577                qualifier,
578                options: Box::new(WildcardOptions::default()),
579            })
580        }
581        ExprType::ScalarUdfExpr(protobuf::ScalarUdfExprNode {
582            fun_name,
583            args,
584            fun_definition,
585        }) => {
586            let scalar_fn = match fun_definition {
587                Some(buf) => codec.try_decode_udf(fun_name, buf)?,
588                None => ctx
589                    .udf(fun_name.as_str())
590                    .or_else(|_| codec.try_decode_udf(fun_name, &[]))?,
591            };
592            Ok(Expr::ScalarFunction(expr::ScalarFunction::new_udf(
593                scalar_fn,
594                parse_exprs(args, ctx, codec)?,
595            )))
596        }
597        ExprType::AggregateUdfExpr(pb) => {
598            let agg_fn = match &pb.fun_definition {
599                Some(buf) => codec.try_decode_udaf(&pb.fun_name, buf)?,
600                None => ctx
601                    .udaf(&pb.fun_name)
602                    .or_else(|_| codec.try_decode_udaf(&pb.fun_name, &[]))?,
603            };
604            let null_treatment = match pb.null_treatment {
605                Some(null_treatment) => {
606                    let null_treatment  =  protobuf::NullTreatment::try_from(null_treatment)
607                    .map_err(|_| {
608                        proto_error(format!(
609                            "Received an AggregateUdfExprNode message with unknown NullTreatment {null_treatment}",
610                        ))
611                    })?;
612                    Some(NullTreatment::from(null_treatment))
613                }
614                None => None,
615            };
616
617            Ok(Expr::AggregateFunction(expr::AggregateFunction::new_udf(
618                agg_fn,
619                parse_exprs(&pb.args, ctx, codec)?,
620                pb.distinct,
621                parse_optional_expr(pb.filter.as_deref(), ctx, codec)?.map(Box::new),
622                parse_sorts(&pb.order_by, ctx, codec)?,
623                null_treatment,
624            )))
625        }
626
627        ExprType::GroupingSet(GroupingSetNode { expr }) => {
628            Ok(Expr::GroupingSet(GroupingSets(
629                expr.iter()
630                    .map(|expr_list| parse_exprs(&expr_list.expr, ctx, codec))
631                    .collect::<Result<Vec<_>, Error>>()?,
632            )))
633        }
634        ExprType::Cube(CubeNode { expr }) => Ok(Expr::GroupingSet(GroupingSet::Cube(
635            parse_exprs(expr, ctx, codec)?,
636        ))),
637        ExprType::Rollup(RollupNode { expr }) => Ok(Expr::GroupingSet(
638            GroupingSet::Rollup(parse_exprs(expr, ctx, codec)?),
639        )),
640        ExprType::Placeholder(PlaceholderNode {
641            id,
642            data_type,
643            nullable,
644            metadata,
645        }) => match data_type {
646            None => Ok(Expr::Placeholder(Placeholder::new_with_field(
647                id.clone(),
648                None,
649            ))),
650            Some(data_type) => {
651                let field =
652                    Field::new("", data_type.try_into()?, nullable.unwrap_or(true))
653                        .with_metadata(metadata.clone());
654                Ok(Expr::Placeholder(Placeholder::new_with_field(
655                    id.clone(),
656                    Some(field.into()),
657                )))
658            }
659        },
660        ExprType::ScalarSubqueryExpr(sq) => {
661            let subquery = parse_subquery(
662                sq.subquery
663                    .as_deref()
664                    .ok_or_else(|| Error::required("ScalarSubqueryExprNode.subquery"))?,
665                ctx,
666                codec,
667            )?;
668            Ok(Expr::ScalarSubquery(subquery))
669        }
670    }
671}
672
673fn parse_subquery(
674    proto: &protobuf::SubqueryNode,
675    ctx: &TaskContext,
676    codec: &dyn LogicalExtensionCodec,
677) -> Result<Subquery, Error> {
678    let plan_node = proto
679        .subquery
680        .as_ref()
681        .ok_or_else(|| Error::required("SubqueryNode.subquery"))?;
682    let plan = plan_node.try_into_logical_plan(ctx, codec)?;
683    let outer_ref_columns = parse_exprs(&proto.outer_ref_columns, ctx, codec)?;
684    Ok(Subquery {
685        subquery: Arc::new(plan),
686        outer_ref_columns,
687        spans: Default::default(),
688    })
689}
690
691/// Parse a vector of `protobuf::LogicalExprNode`s.
692pub fn parse_exprs<'a, I>(
693    protos: I,
694    ctx: &TaskContext,
695    codec: &dyn LogicalExtensionCodec,
696) -> Result<Vec<Expr>, Error>
697where
698    I: IntoIterator<Item = &'a protobuf::LogicalExprNode>,
699{
700    let res = protos
701        .into_iter()
702        .map(|elem| {
703            parse_expr(elem, ctx, codec).map_err(|e| plan_datafusion_err!("{}", e))
704        })
705        .collect::<Result<Vec<_>>>()?;
706    Ok(res)
707}
708
709pub fn parse_sorts<'a, I>(
710    protos: I,
711    ctx: &TaskContext,
712    codec: &dyn LogicalExtensionCodec,
713) -> Result<Vec<Sort>, Error>
714where
715    I: IntoIterator<Item = &'a protobuf::SortExprNode>,
716{
717    protos
718        .into_iter()
719        .map(|sort| parse_sort(sort, ctx, codec))
720        .collect::<Result<Vec<Sort>, Error>>()
721}
722
723pub fn parse_sort(
724    sort: &protobuf::SortExprNode,
725    ctx: &TaskContext,
726    codec: &dyn LogicalExtensionCodec,
727) -> Result<Sort, Error> {
728    Ok(Sort::new(
729        parse_required_expr(sort.expr.as_ref(), ctx, "expr", codec)?,
730        sort.asc,
731        sort.nulls_first,
732    ))
733}
734
735/// Parse an optional escape_char for Like, ILike, SimilarTo
736fn parse_escape_char(s: &str) -> Result<Option<char>> {
737    match s.len() {
738        0 => Ok(None),
739        1 => Ok(s.chars().next()),
740        _ => internal_err!("Invalid length for escape char"),
741    }
742}
743
744pub fn from_proto_binary_op(op: &str) -> Result<Operator, Error> {
745    match op {
746        "And" => Ok(Operator::And),
747        "Or" => Ok(Operator::Or),
748        "Eq" => Ok(Operator::Eq),
749        "NotEq" => Ok(Operator::NotEq),
750        "LtEq" => Ok(Operator::LtEq),
751        "Lt" => Ok(Operator::Lt),
752        "Gt" => Ok(Operator::Gt),
753        "GtEq" => Ok(Operator::GtEq),
754        "Plus" => Ok(Operator::Plus),
755        "Minus" => Ok(Operator::Minus),
756        "Multiply" => Ok(Operator::Multiply),
757        "Divide" => Ok(Operator::Divide),
758        "Modulo" => Ok(Operator::Modulo),
759        "IsDistinctFrom" => Ok(Operator::IsDistinctFrom),
760        "IsNotDistinctFrom" => Ok(Operator::IsNotDistinctFrom),
761        "BitwiseAnd" => Ok(Operator::BitwiseAnd),
762        "BitwiseOr" => Ok(Operator::BitwiseOr),
763        "BitwiseXor" => Ok(Operator::BitwiseXor),
764        "BitwiseShiftLeft" => Ok(Operator::BitwiseShiftLeft),
765        "BitwiseShiftRight" => Ok(Operator::BitwiseShiftRight),
766        "RegexIMatch" => Ok(Operator::RegexIMatch),
767        "RegexMatch" => Ok(Operator::RegexMatch),
768        "RegexNotIMatch" => Ok(Operator::RegexNotIMatch),
769        "RegexNotMatch" => Ok(Operator::RegexNotMatch),
770        "LikeMatch" => Ok(Operator::LikeMatch),
771        "ILikeMatch" => Ok(Operator::ILikeMatch),
772        "NotLikeMatch" => Ok(Operator::NotLikeMatch),
773        "NotILikeMatch" => Ok(Operator::NotILikeMatch),
774        "StringConcat" => Ok(Operator::StringConcat),
775        "AtArrow" => Ok(Operator::AtArrow),
776        "ArrowAt" => Ok(Operator::ArrowAt),
777        other => Err(proto_error(format!(
778            "Unsupported binary operator '{other:?}'"
779        ))),
780    }
781}
782
783fn parse_optional_expr(
784    p: Option<&protobuf::LogicalExprNode>,
785    ctx: &TaskContext,
786    codec: &dyn LogicalExtensionCodec,
787) -> Result<Option<Expr>, Error> {
788    match p {
789        Some(expr) => parse_expr(expr, ctx, codec).map(Some),
790        None => Ok(None),
791    }
792}
793
794fn parse_required_expr(
795    p: Option<&protobuf::LogicalExprNode>,
796    ctx: &TaskContext,
797    field: impl Into<String>,
798    codec: &dyn LogicalExtensionCodec,
799) -> Result<Expr, Error> {
800    match p {
801        Some(expr) => parse_expr(expr, ctx, codec),
802        None => Err(Error::required(field)),
803    }
804}
805
806fn proto_error<S: Into<String>>(message: S) -> Error {
807    Error::General(message.into())
808}