datafusion_proto/logical_plan/
from_proto.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use arrow::datatypes::Field;
21use datafusion_common::{
22    NullEquality, RecursionUnnestOption, Result, ScalarValue, TableReference,
23    UnnestOptions, exec_datafusion_err, internal_err, plan_datafusion_err,
24};
25use datafusion_execution::registry::FunctionRegistry;
26use datafusion_expr::dml::InsertOp;
27use datafusion_expr::expr::{Alias, NullTreatment, Placeholder, Sort};
28use datafusion_expr::expr::{Unnest, WildcardOptions};
29use datafusion_expr::{
30    Between, BinaryExpr, Case, Cast, Expr, GroupingSet,
31    GroupingSet::GroupingSets,
32    JoinConstraint, JoinType, Like, Operator, TryCast, WindowFrame, WindowFrameBound,
33    WindowFrameUnits,
34    expr::{self, InList, WindowFunction},
35    logical_plan::{PlanType, StringifiedPlan},
36};
37use datafusion_expr::{ExprFunctionExt, WriteOp};
38use datafusion_proto_common::{FromProtoError as Error, from_proto::FromOptionalField};
39
40use crate::protobuf::plan_type::PlanTypeEnum::{
41    FinalPhysicalPlanWithSchema, InitialPhysicalPlanWithSchema,
42};
43use crate::protobuf::{
44    self, AnalyzedLogicalPlanType, CubeNode, GroupingSetNode, OptimizedLogicalPlanType,
45    OptimizedPhysicalPlanType, PlaceholderNode, RollupNode,
46    plan_type::PlanTypeEnum::{
47        AnalyzedLogicalPlan, FinalAnalyzedLogicalPlan, FinalLogicalPlan,
48        FinalPhysicalPlan, FinalPhysicalPlanWithStats, InitialLogicalPlan,
49        InitialPhysicalPlan, InitialPhysicalPlanWithStats, OptimizedLogicalPlan,
50        OptimizedPhysicalPlan, PhysicalPlanError,
51    },
52};
53
54use super::LogicalExtensionCodec;
55
56impl From<&protobuf::UnnestOptions> for UnnestOptions {
57    fn from(opts: &protobuf::UnnestOptions) -> Self {
58        Self {
59            preserve_nulls: opts.preserve_nulls,
60            recursions: opts
61                .recursions
62                .iter()
63                .map(|r| RecursionUnnestOption {
64                    input_column: r.input_column.as_ref().unwrap().into(),
65                    output_column: r.output_column.as_ref().unwrap().into(),
66                    depth: r.depth as usize,
67                })
68                .collect::<Vec<_>>(),
69        }
70    }
71}
72
73impl From<protobuf::WindowFrameUnits> for WindowFrameUnits {
74    fn from(units: protobuf::WindowFrameUnits) -> Self {
75        match units {
76            protobuf::WindowFrameUnits::Rows => Self::Rows,
77            protobuf::WindowFrameUnits::Range => Self::Range,
78            protobuf::WindowFrameUnits::Groups => Self::Groups,
79        }
80    }
81}
82
83impl TryFrom<protobuf::TableReference> for TableReference {
84    type Error = Error;
85
86    fn try_from(value: protobuf::TableReference) -> Result<Self, Self::Error> {
87        use protobuf::table_reference::TableReferenceEnum;
88        let table_reference_enum = value
89            .table_reference_enum
90            .ok_or_else(|| Error::required("table_reference_enum"))?;
91
92        match table_reference_enum {
93            TableReferenceEnum::Bare(protobuf::BareTableReference { table }) => {
94                Ok(TableReference::bare(table))
95            }
96            TableReferenceEnum::Partial(protobuf::PartialTableReference {
97                schema,
98                table,
99            }) => Ok(TableReference::partial(schema, table)),
100            TableReferenceEnum::Full(protobuf::FullTableReference {
101                catalog,
102                schema,
103                table,
104            }) => Ok(TableReference::full(catalog, schema, table)),
105        }
106    }
107}
108
109impl From<&protobuf::StringifiedPlan> for StringifiedPlan {
110    fn from(stringified_plan: &protobuf::StringifiedPlan) -> Self {
111        Self {
112            plan_type: match stringified_plan
113                .plan_type
114                .as_ref()
115                .and_then(|pt| pt.plan_type_enum.as_ref())
116                .unwrap_or_else(|| {
117                    panic!(
118                        "Cannot create protobuf::StringifiedPlan from {stringified_plan:?}"
119                    )
120                }) {
121                InitialLogicalPlan(_) => PlanType::InitialLogicalPlan,
122                AnalyzedLogicalPlan(AnalyzedLogicalPlanType { analyzer_name }) => {
123                    PlanType::AnalyzedLogicalPlan {
124                        analyzer_name:analyzer_name.clone()
125                    }
126                }
127                FinalAnalyzedLogicalPlan(_) => PlanType::FinalAnalyzedLogicalPlan,
128                OptimizedLogicalPlan(OptimizedLogicalPlanType { optimizer_name }) => {
129                    PlanType::OptimizedLogicalPlan {
130                        optimizer_name: optimizer_name.clone(),
131                    }
132                }
133                FinalLogicalPlan(_) => PlanType::FinalLogicalPlan,
134                InitialPhysicalPlan(_) => PlanType::InitialPhysicalPlan,
135                InitialPhysicalPlanWithStats(_) => PlanType::InitialPhysicalPlanWithStats,
136                InitialPhysicalPlanWithSchema(_) => PlanType::InitialPhysicalPlanWithSchema,
137                OptimizedPhysicalPlan(OptimizedPhysicalPlanType { optimizer_name }) => {
138                    PlanType::OptimizedPhysicalPlan {
139                        optimizer_name: optimizer_name.clone(),
140                    }
141                }
142                FinalPhysicalPlan(_) => PlanType::FinalPhysicalPlan,
143                FinalPhysicalPlanWithStats(_) => PlanType::FinalPhysicalPlanWithStats,
144                FinalPhysicalPlanWithSchema(_) => PlanType::FinalPhysicalPlanWithSchema,
145                PhysicalPlanError(_) => PlanType::PhysicalPlanError,
146            },
147            plan: Arc::new(stringified_plan.plan.clone()),
148        }
149    }
150}
151
152impl TryFrom<protobuf::WindowFrame> for WindowFrame {
153    type Error = Error;
154
155    fn try_from(window: protobuf::WindowFrame) -> Result<Self, Self::Error> {
156        let units = protobuf::WindowFrameUnits::try_from(window.window_frame_units)
157            .map_err(|_| Error::unknown("WindowFrameUnits", window.window_frame_units))?
158            .into();
159        let start_bound = window.start_bound.required("start_bound")?;
160        let end_bound = window
161            .end_bound
162            .map(|end_bound| match end_bound {
163                protobuf::window_frame::EndBound::Bound(end_bound) => {
164                    end_bound.try_into()
165                }
166            })
167            .transpose()?
168            .unwrap_or(WindowFrameBound::CurrentRow);
169        Ok(WindowFrame::new_bounds(units, start_bound, end_bound))
170    }
171}
172
173impl TryFrom<protobuf::WindowFrameBound> for WindowFrameBound {
174    type Error = Error;
175
176    fn try_from(bound: protobuf::WindowFrameBound) -> Result<Self, Self::Error> {
177        let bound_type =
178            protobuf::WindowFrameBoundType::try_from(bound.window_frame_bound_type)
179                .map_err(|_| {
180                    Error::unknown("WindowFrameBoundType", bound.window_frame_bound_type)
181                })?;
182        match bound_type {
183            protobuf::WindowFrameBoundType::CurrentRow => Ok(Self::CurrentRow),
184            protobuf::WindowFrameBoundType::Preceding => match bound.bound_value {
185                Some(x) => Ok(Self::Preceding(ScalarValue::try_from(&x)?)),
186                None => Ok(Self::Preceding(ScalarValue::UInt64(None))),
187            },
188            protobuf::WindowFrameBoundType::Following => match bound.bound_value {
189                Some(x) => Ok(Self::Following(ScalarValue::try_from(&x)?)),
190                None => Ok(Self::Following(ScalarValue::UInt64(None))),
191            },
192        }
193    }
194}
195
196impl From<protobuf::JoinType> for JoinType {
197    fn from(t: protobuf::JoinType) -> Self {
198        match t {
199            protobuf::JoinType::Inner => JoinType::Inner,
200            protobuf::JoinType::Left => JoinType::Left,
201            protobuf::JoinType::Right => JoinType::Right,
202            protobuf::JoinType::Full => JoinType::Full,
203            protobuf::JoinType::Leftsemi => JoinType::LeftSemi,
204            protobuf::JoinType::Rightsemi => JoinType::RightSemi,
205            protobuf::JoinType::Leftanti => JoinType::LeftAnti,
206            protobuf::JoinType::Rightanti => JoinType::RightAnti,
207            protobuf::JoinType::Leftmark => JoinType::LeftMark,
208            protobuf::JoinType::Rightmark => JoinType::RightMark,
209        }
210    }
211}
212
213impl From<protobuf::JoinConstraint> for JoinConstraint {
214    fn from(t: protobuf::JoinConstraint) -> Self {
215        match t {
216            protobuf::JoinConstraint::On => JoinConstraint::On,
217            protobuf::JoinConstraint::Using => JoinConstraint::Using,
218        }
219    }
220}
221
222impl From<protobuf::NullEquality> for NullEquality {
223    fn from(t: protobuf::NullEquality) -> Self {
224        match t {
225            protobuf::NullEquality::NullEqualsNothing => NullEquality::NullEqualsNothing,
226            protobuf::NullEquality::NullEqualsNull => NullEquality::NullEqualsNull,
227        }
228    }
229}
230
231impl From<protobuf::dml_node::Type> for WriteOp {
232    fn from(t: protobuf::dml_node::Type) -> Self {
233        match t {
234            protobuf::dml_node::Type::Update => WriteOp::Update,
235            protobuf::dml_node::Type::Delete => WriteOp::Delete,
236            protobuf::dml_node::Type::InsertAppend => WriteOp::Insert(InsertOp::Append),
237            protobuf::dml_node::Type::InsertOverwrite => {
238                WriteOp::Insert(InsertOp::Overwrite)
239            }
240            protobuf::dml_node::Type::InsertReplace => WriteOp::Insert(InsertOp::Replace),
241            protobuf::dml_node::Type::Ctas => WriteOp::Ctas,
242        }
243    }
244}
245
246impl From<protobuf::NullTreatment> for NullTreatment {
247    fn from(t: protobuf::NullTreatment) -> Self {
248        match t {
249            protobuf::NullTreatment::RespectNulls => NullTreatment::RespectNulls,
250            protobuf::NullTreatment::IgnoreNulls => NullTreatment::IgnoreNulls,
251        }
252    }
253}
254
255pub fn parse_expr(
256    proto: &protobuf::LogicalExprNode,
257    registry: &dyn FunctionRegistry,
258    codec: &dyn LogicalExtensionCodec,
259) -> Result<Expr, Error> {
260    use protobuf::{logical_expr_node::ExprType, window_expr_node};
261
262    let expr_type = proto
263        .expr_type
264        .as_ref()
265        .ok_or_else(|| Error::required("expr_type"))?;
266
267    match expr_type {
268        ExprType::BinaryExpr(binary_expr) => {
269            let op = from_proto_binary_op(&binary_expr.op)?;
270            let operands = parse_exprs(&binary_expr.operands, registry, codec)?;
271
272            if operands.len() < 2 {
273                return Err(proto_error(
274                    "A binary expression must always have at least 2 operands",
275                ));
276            }
277
278            // Reduce the linearized operands (ordered by left innermost to right
279            // outermost) into a single expression tree.
280            Ok(operands
281                .into_iter()
282                .reduce(|left, right| {
283                    Expr::BinaryExpr(BinaryExpr::new(Box::new(left), op, Box::new(right)))
284                })
285                .expect("Binary expression could not be reduced to a single expression."))
286        }
287        ExprType::Column(column) => Ok(Expr::Column(column.into())),
288        ExprType::Literal(literal) => {
289            let scalar_value: ScalarValue = literal.try_into()?;
290            Ok(Expr::Literal(scalar_value, None))
291        }
292        ExprType::WindowExpr(expr) => {
293            let window_function = expr
294                .window_function
295                .as_ref()
296                .ok_or_else(|| Error::required("window_function"))?;
297            let partition_by = parse_exprs(&expr.partition_by, registry, codec)?;
298            let mut order_by = parse_sorts(&expr.order_by, registry, codec)?;
299            let window_frame = expr
300                .window_frame
301                .as_ref()
302                .map::<Result<WindowFrame, _>, _>(|window_frame| {
303                    let window_frame: WindowFrame = window_frame.clone().try_into()?;
304                    window_frame
305                        .regularize_order_bys(&mut order_by)
306                        .map(|_| window_frame)
307                })
308                .transpose()?
309                .ok_or_else(|| {
310                    exec_datafusion_err!("missing window frame during deserialization")
311                })?;
312
313            let null_treatment = match expr.null_treatment {
314                Some(null_treatment) => {
315                    let null_treatment  =  protobuf::NullTreatment::try_from(null_treatment)
316                    .map_err(|_| {
317                        proto_error(format!(
318                            "Received a WindowExprNode message with unknown NullTreatment {null_treatment}",
319                        ))
320                    })?;
321                    Some(NullTreatment::from(null_treatment))
322                }
323                None => None,
324            };
325
326            let agg_fn = match window_function {
327                window_expr_node::WindowFunction::Udaf(udaf_name) => {
328                    let udaf_function = match &expr.fun_definition {
329                        Some(buf) => codec.try_decode_udaf(udaf_name, buf)?,
330                        None => registry
331                            .udaf(udaf_name)
332                            .or_else(|_| codec.try_decode_udaf(udaf_name, &[]))?,
333                    };
334                    expr::WindowFunctionDefinition::AggregateUDF(udaf_function)
335                }
336                window_expr_node::WindowFunction::Udwf(udwf_name) => {
337                    let udwf_function = match &expr.fun_definition {
338                        Some(buf) => codec.try_decode_udwf(udwf_name, buf)?,
339                        None => registry
340                            .udwf(udwf_name)
341                            .or_else(|_| codec.try_decode_udwf(udwf_name, &[]))?,
342                    };
343                    expr::WindowFunctionDefinition::WindowUDF(udwf_function)
344                }
345            };
346
347            let args = parse_exprs(&expr.exprs, registry, codec)?;
348            let mut builder = Expr::from(WindowFunction::new(agg_fn, args))
349                .partition_by(partition_by)
350                .order_by(order_by)
351                .window_frame(window_frame)
352                .null_treatment(null_treatment);
353
354            if expr.distinct {
355                builder = builder.distinct();
356            };
357
358            if let Some(filter) =
359                parse_optional_expr(expr.filter.as_deref(), registry, codec)?
360            {
361                builder = builder.filter(filter);
362            }
363
364            builder.build().map_err(Error::DataFusionError)
365        }
366        ExprType::Alias(alias) => Ok(Expr::Alias(Alias::new(
367            parse_required_expr(alias.expr.as_deref(), registry, "expr", codec)?,
368            alias
369                .relation
370                .first()
371                .map(|r| TableReference::try_from(r.clone()))
372                .transpose()?,
373            alias.alias.clone(),
374        ))),
375        ExprType::IsNullExpr(is_null) => Ok(Expr::IsNull(Box::new(parse_required_expr(
376            is_null.expr.as_deref(),
377            registry,
378            "expr",
379            codec,
380        )?))),
381        ExprType::IsNotNullExpr(is_not_null) => Ok(Expr::IsNotNull(Box::new(
382            parse_required_expr(is_not_null.expr.as_deref(), registry, "expr", codec)?,
383        ))),
384        ExprType::NotExpr(not) => Ok(Expr::Not(Box::new(parse_required_expr(
385            not.expr.as_deref(),
386            registry,
387            "expr",
388            codec,
389        )?))),
390        ExprType::IsTrue(msg) => Ok(Expr::IsTrue(Box::new(parse_required_expr(
391            msg.expr.as_deref(),
392            registry,
393            "expr",
394            codec,
395        )?))),
396        ExprType::IsFalse(msg) => Ok(Expr::IsFalse(Box::new(parse_required_expr(
397            msg.expr.as_deref(),
398            registry,
399            "expr",
400            codec,
401        )?))),
402        ExprType::IsUnknown(msg) => Ok(Expr::IsUnknown(Box::new(parse_required_expr(
403            msg.expr.as_deref(),
404            registry,
405            "expr",
406            codec,
407        )?))),
408        ExprType::IsNotTrue(msg) => Ok(Expr::IsNotTrue(Box::new(parse_required_expr(
409            msg.expr.as_deref(),
410            registry,
411            "expr",
412            codec,
413        )?))),
414        ExprType::IsNotFalse(msg) => Ok(Expr::IsNotFalse(Box::new(parse_required_expr(
415            msg.expr.as_deref(),
416            registry,
417            "expr",
418            codec,
419        )?))),
420        ExprType::IsNotUnknown(msg) => Ok(Expr::IsNotUnknown(Box::new(
421            parse_required_expr(msg.expr.as_deref(), registry, "expr", codec)?,
422        ))),
423        ExprType::Between(between) => Ok(Expr::Between(Between::new(
424            Box::new(parse_required_expr(
425                between.expr.as_deref(),
426                registry,
427                "expr",
428                codec,
429            )?),
430            between.negated,
431            Box::new(parse_required_expr(
432                between.low.as_deref(),
433                registry,
434                "expr",
435                codec,
436            )?),
437            Box::new(parse_required_expr(
438                between.high.as_deref(),
439                registry,
440                "expr",
441                codec,
442            )?),
443        ))),
444        ExprType::Like(like) => Ok(Expr::Like(Like::new(
445            like.negated,
446            Box::new(parse_required_expr(
447                like.expr.as_deref(),
448                registry,
449                "expr",
450                codec,
451            )?),
452            Box::new(parse_required_expr(
453                like.pattern.as_deref(),
454                registry,
455                "pattern",
456                codec,
457            )?),
458            parse_escape_char(&like.escape_char)?,
459            false,
460        ))),
461        ExprType::Ilike(like) => Ok(Expr::Like(Like::new(
462            like.negated,
463            Box::new(parse_required_expr(
464                like.expr.as_deref(),
465                registry,
466                "expr",
467                codec,
468            )?),
469            Box::new(parse_required_expr(
470                like.pattern.as_deref(),
471                registry,
472                "pattern",
473                codec,
474            )?),
475            parse_escape_char(&like.escape_char)?,
476            true,
477        ))),
478        ExprType::SimilarTo(like) => Ok(Expr::SimilarTo(Like::new(
479            like.negated,
480            Box::new(parse_required_expr(
481                like.expr.as_deref(),
482                registry,
483                "expr",
484                codec,
485            )?),
486            Box::new(parse_required_expr(
487                like.pattern.as_deref(),
488                registry,
489                "pattern",
490                codec,
491            )?),
492            parse_escape_char(&like.escape_char)?,
493            false,
494        ))),
495        ExprType::Case(case) => {
496            let when_then_expr = case
497                .when_then_expr
498                .iter()
499                .map(|e| {
500                    let when_expr = parse_required_expr(
501                        e.when_expr.as_ref(),
502                        registry,
503                        "when_expr",
504                        codec,
505                    )?;
506                    let then_expr = parse_required_expr(
507                        e.then_expr.as_ref(),
508                        registry,
509                        "then_expr",
510                        codec,
511                    )?;
512                    Ok((Box::new(when_expr), Box::new(then_expr)))
513                })
514                .collect::<Result<Vec<(Box<Expr>, Box<Expr>)>, Error>>()?;
515            Ok(Expr::Case(Case::new(
516                parse_optional_expr(case.expr.as_deref(), registry, codec)?.map(Box::new),
517                when_then_expr,
518                parse_optional_expr(case.else_expr.as_deref(), registry, codec)?
519                    .map(Box::new),
520            )))
521        }
522        ExprType::Cast(cast) => {
523            let expr = Box::new(parse_required_expr(
524                cast.expr.as_deref(),
525                registry,
526                "expr",
527                codec,
528            )?);
529            let data_type = cast.arrow_type.as_ref().required("arrow_type")?;
530            Ok(Expr::Cast(Cast::new(expr, data_type)))
531        }
532        ExprType::TryCast(cast) => {
533            let expr = Box::new(parse_required_expr(
534                cast.expr.as_deref(),
535                registry,
536                "expr",
537                codec,
538            )?);
539            let data_type = cast.arrow_type.as_ref().required("arrow_type")?;
540            Ok(Expr::TryCast(TryCast::new(expr, data_type)))
541        }
542        ExprType::Negative(negative) => Ok(Expr::Negative(Box::new(
543            parse_required_expr(negative.expr.as_deref(), registry, "expr", codec)?,
544        ))),
545        ExprType::Unnest(unnest) => {
546            let mut exprs = parse_exprs(&unnest.exprs, registry, codec)?;
547            if exprs.len() != 1 {
548                return Err(proto_error("Unnest must have exactly one expression"));
549            }
550            Ok(Expr::Unnest(Unnest::new(exprs.swap_remove(0))))
551        }
552        ExprType::InList(in_list) => Ok(Expr::InList(InList::new(
553            Box::new(parse_required_expr(
554                in_list.expr.as_deref(),
555                registry,
556                "expr",
557                codec,
558            )?),
559            parse_exprs(&in_list.list, registry, codec)?,
560            in_list.negated,
561        ))),
562        ExprType::Wildcard(protobuf::Wildcard { qualifier }) => {
563            let qualifier = qualifier.to_owned().map(|x| x.try_into()).transpose()?;
564            #[expect(deprecated)]
565            Ok(Expr::Wildcard {
566                qualifier,
567                options: Box::new(WildcardOptions::default()),
568            })
569        }
570        ExprType::ScalarUdfExpr(protobuf::ScalarUdfExprNode {
571            fun_name,
572            args,
573            fun_definition,
574        }) => {
575            let scalar_fn = match fun_definition {
576                Some(buf) => codec.try_decode_udf(fun_name, buf)?,
577                None => registry
578                    .udf(fun_name.as_str())
579                    .or_else(|_| codec.try_decode_udf(fun_name, &[]))?,
580            };
581            Ok(Expr::ScalarFunction(expr::ScalarFunction::new_udf(
582                scalar_fn,
583                parse_exprs(args, registry, codec)?,
584            )))
585        }
586        ExprType::AggregateUdfExpr(pb) => {
587            let agg_fn = match &pb.fun_definition {
588                Some(buf) => codec.try_decode_udaf(&pb.fun_name, buf)?,
589                None => registry
590                    .udaf(&pb.fun_name)
591                    .or_else(|_| codec.try_decode_udaf(&pb.fun_name, &[]))?,
592            };
593            let null_treatment = match pb.null_treatment {
594                Some(null_treatment) => {
595                    let null_treatment  =  protobuf::NullTreatment::try_from(null_treatment)
596                    .map_err(|_| {
597                        proto_error(format!(
598                            "Received an AggregateUdfExprNode message with unknown NullTreatment {null_treatment}",
599                        ))
600                    })?;
601                    Some(NullTreatment::from(null_treatment))
602                }
603                None => None,
604            };
605
606            Ok(Expr::AggregateFunction(expr::AggregateFunction::new_udf(
607                agg_fn,
608                parse_exprs(&pb.args, registry, codec)?,
609                pb.distinct,
610                parse_optional_expr(pb.filter.as_deref(), registry, codec)?.map(Box::new),
611                parse_sorts(&pb.order_by, registry, codec)?,
612                null_treatment,
613            )))
614        }
615
616        ExprType::GroupingSet(GroupingSetNode { expr }) => {
617            Ok(Expr::GroupingSet(GroupingSets(
618                expr.iter()
619                    .map(|expr_list| parse_exprs(&expr_list.expr, registry, codec))
620                    .collect::<Result<Vec<_>, Error>>()?,
621            )))
622        }
623        ExprType::Cube(CubeNode { expr }) => Ok(Expr::GroupingSet(GroupingSet::Cube(
624            parse_exprs(expr, registry, codec)?,
625        ))),
626        ExprType::Rollup(RollupNode { expr }) => Ok(Expr::GroupingSet(
627            GroupingSet::Rollup(parse_exprs(expr, registry, codec)?),
628        )),
629        ExprType::Placeholder(PlaceholderNode {
630            id,
631            data_type,
632            nullable,
633            metadata,
634        }) => match data_type {
635            None => Ok(Expr::Placeholder(Placeholder::new_with_field(
636                id.clone(),
637                None,
638            ))),
639            Some(data_type) => {
640                let field =
641                    Field::new("", data_type.try_into()?, nullable.unwrap_or(true))
642                        .with_metadata(metadata.clone());
643                Ok(Expr::Placeholder(Placeholder::new_with_field(
644                    id.clone(),
645                    Some(field.into()),
646                )))
647            }
648        },
649    }
650}
651
652/// Parse a vector of `protobuf::LogicalExprNode`s.
653pub fn parse_exprs<'a, I>(
654    protos: I,
655    registry: &dyn FunctionRegistry,
656    codec: &dyn LogicalExtensionCodec,
657) -> Result<Vec<Expr>, Error>
658where
659    I: IntoIterator<Item = &'a protobuf::LogicalExprNode>,
660{
661    let res = protos
662        .into_iter()
663        .map(|elem| {
664            parse_expr(elem, registry, codec).map_err(|e| plan_datafusion_err!("{}", e))
665        })
666        .collect::<Result<Vec<_>>>()?;
667    Ok(res)
668}
669
670pub fn parse_sorts<'a, I>(
671    protos: I,
672    registry: &dyn FunctionRegistry,
673    codec: &dyn LogicalExtensionCodec,
674) -> Result<Vec<Sort>, Error>
675where
676    I: IntoIterator<Item = &'a protobuf::SortExprNode>,
677{
678    protos
679        .into_iter()
680        .map(|sort| parse_sort(sort, registry, codec))
681        .collect::<Result<Vec<Sort>, Error>>()
682}
683
684pub fn parse_sort(
685    sort: &protobuf::SortExprNode,
686    registry: &dyn FunctionRegistry,
687    codec: &dyn LogicalExtensionCodec,
688) -> Result<Sort, Error> {
689    Ok(Sort::new(
690        parse_required_expr(sort.expr.as_ref(), registry, "expr", codec)?,
691        sort.asc,
692        sort.nulls_first,
693    ))
694}
695
696/// Parse an optional escape_char for Like, ILike, SimilarTo
697fn parse_escape_char(s: &str) -> Result<Option<char>> {
698    match s.len() {
699        0 => Ok(None),
700        1 => Ok(s.chars().next()),
701        _ => internal_err!("Invalid length for escape char"),
702    }
703}
704
705pub fn from_proto_binary_op(op: &str) -> Result<Operator, Error> {
706    match op {
707        "And" => Ok(Operator::And),
708        "Or" => Ok(Operator::Or),
709        "Eq" => Ok(Operator::Eq),
710        "NotEq" => Ok(Operator::NotEq),
711        "LtEq" => Ok(Operator::LtEq),
712        "Lt" => Ok(Operator::Lt),
713        "Gt" => Ok(Operator::Gt),
714        "GtEq" => Ok(Operator::GtEq),
715        "Plus" => Ok(Operator::Plus),
716        "Minus" => Ok(Operator::Minus),
717        "Multiply" => Ok(Operator::Multiply),
718        "Divide" => Ok(Operator::Divide),
719        "Modulo" => Ok(Operator::Modulo),
720        "IsDistinctFrom" => Ok(Operator::IsDistinctFrom),
721        "IsNotDistinctFrom" => Ok(Operator::IsNotDistinctFrom),
722        "BitwiseAnd" => Ok(Operator::BitwiseAnd),
723        "BitwiseOr" => Ok(Operator::BitwiseOr),
724        "BitwiseXor" => Ok(Operator::BitwiseXor),
725        "BitwiseShiftLeft" => Ok(Operator::BitwiseShiftLeft),
726        "BitwiseShiftRight" => Ok(Operator::BitwiseShiftRight),
727        "RegexIMatch" => Ok(Operator::RegexIMatch),
728        "RegexMatch" => Ok(Operator::RegexMatch),
729        "RegexNotIMatch" => Ok(Operator::RegexNotIMatch),
730        "RegexNotMatch" => Ok(Operator::RegexNotMatch),
731        "LikeMatch" => Ok(Operator::LikeMatch),
732        "ILikeMatch" => Ok(Operator::ILikeMatch),
733        "NotLikeMatch" => Ok(Operator::NotLikeMatch),
734        "NotILikeMatch" => Ok(Operator::NotILikeMatch),
735        "StringConcat" => Ok(Operator::StringConcat),
736        "AtArrow" => Ok(Operator::AtArrow),
737        "ArrowAt" => Ok(Operator::ArrowAt),
738        other => Err(proto_error(format!(
739            "Unsupported binary operator '{other:?}'"
740        ))),
741    }
742}
743
744fn parse_optional_expr(
745    p: Option<&protobuf::LogicalExprNode>,
746    registry: &dyn FunctionRegistry,
747    codec: &dyn LogicalExtensionCodec,
748) -> Result<Option<Expr>, Error> {
749    match p {
750        Some(expr) => parse_expr(expr, registry, codec).map(Some),
751        None => Ok(None),
752    }
753}
754
755fn parse_required_expr(
756    p: Option<&protobuf::LogicalExprNode>,
757    registry: &dyn FunctionRegistry,
758    field: impl Into<String>,
759    codec: &dyn LogicalExtensionCodec,
760) -> Result<Expr, Error> {
761    match p {
762        Some(expr) => parse_expr(expr, registry, codec),
763        None => Err(Error::required(field)),
764    }
765}
766
767fn proto_error<S: Into<String>>(message: S) -> Error {
768    Error::General(message.into())
769}