datafusion_proto/logical_plan/
from_proto.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use arrow::datatypes::Field;
21use datafusion_common::{
22    exec_datafusion_err, internal_err, plan_datafusion_err, NullEquality,
23    RecursionUnnestOption, Result, ScalarValue, TableReference, UnnestOptions,
24};
25use datafusion_execution::registry::FunctionRegistry;
26use datafusion_expr::dml::InsertOp;
27use datafusion_expr::expr::{Alias, NullTreatment, Placeholder, Sort};
28use datafusion_expr::expr::{Unnest, WildcardOptions};
29use datafusion_expr::{
30    expr::{self, InList, WindowFunction},
31    logical_plan::{PlanType, StringifiedPlan},
32    Between, BinaryExpr, Case, Cast, Expr, GroupingSet,
33    GroupingSet::GroupingSets,
34    JoinConstraint, JoinType, Like, Operator, TryCast, WindowFrame, WindowFrameBound,
35    WindowFrameUnits,
36};
37use datafusion_expr::{ExprFunctionExt, WriteOp};
38use datafusion_proto_common::{from_proto::FromOptionalField, FromProtoError as Error};
39
40use crate::protobuf::plan_type::PlanTypeEnum::{
41    FinalPhysicalPlanWithSchema, InitialPhysicalPlanWithSchema,
42};
43use crate::protobuf::{
44    self,
45    plan_type::PlanTypeEnum::{
46        AnalyzedLogicalPlan, FinalAnalyzedLogicalPlan, FinalLogicalPlan,
47        FinalPhysicalPlan, FinalPhysicalPlanWithStats, InitialLogicalPlan,
48        InitialPhysicalPlan, InitialPhysicalPlanWithStats, OptimizedLogicalPlan,
49        OptimizedPhysicalPlan, PhysicalPlanError,
50    },
51    AnalyzedLogicalPlanType, CubeNode, GroupingSetNode, OptimizedLogicalPlanType,
52    OptimizedPhysicalPlanType, PlaceholderNode, RollupNode,
53};
54
55use super::LogicalExtensionCodec;
56
57impl From<&protobuf::UnnestOptions> for UnnestOptions {
58    fn from(opts: &protobuf::UnnestOptions) -> Self {
59        Self {
60            preserve_nulls: opts.preserve_nulls,
61            recursions: opts
62                .recursions
63                .iter()
64                .map(|r| RecursionUnnestOption {
65                    input_column: r.input_column.as_ref().unwrap().into(),
66                    output_column: r.output_column.as_ref().unwrap().into(),
67                    depth: r.depth as usize,
68                })
69                .collect::<Vec<_>>(),
70        }
71    }
72}
73
74impl From<protobuf::WindowFrameUnits> for WindowFrameUnits {
75    fn from(units: protobuf::WindowFrameUnits) -> Self {
76        match units {
77            protobuf::WindowFrameUnits::Rows => Self::Rows,
78            protobuf::WindowFrameUnits::Range => Self::Range,
79            protobuf::WindowFrameUnits::Groups => Self::Groups,
80        }
81    }
82}
83
84impl TryFrom<protobuf::TableReference> for TableReference {
85    type Error = Error;
86
87    fn try_from(value: protobuf::TableReference) -> Result<Self, Self::Error> {
88        use protobuf::table_reference::TableReferenceEnum;
89        let table_reference_enum = value
90            .table_reference_enum
91            .ok_or_else(|| Error::required("table_reference_enum"))?;
92
93        match table_reference_enum {
94            TableReferenceEnum::Bare(protobuf::BareTableReference { table }) => {
95                Ok(TableReference::bare(table))
96            }
97            TableReferenceEnum::Partial(protobuf::PartialTableReference {
98                schema,
99                table,
100            }) => Ok(TableReference::partial(schema, table)),
101            TableReferenceEnum::Full(protobuf::FullTableReference {
102                catalog,
103                schema,
104                table,
105            }) => Ok(TableReference::full(catalog, schema, table)),
106        }
107    }
108}
109
110impl From<&protobuf::StringifiedPlan> for StringifiedPlan {
111    fn from(stringified_plan: &protobuf::StringifiedPlan) -> Self {
112        Self {
113            plan_type: match stringified_plan
114                .plan_type
115                .as_ref()
116                .and_then(|pt| pt.plan_type_enum.as_ref())
117                .unwrap_or_else(|| {
118                    panic!(
119                        "Cannot create protobuf::StringifiedPlan from {stringified_plan:?}"
120                    )
121                }) {
122                InitialLogicalPlan(_) => PlanType::InitialLogicalPlan,
123                AnalyzedLogicalPlan(AnalyzedLogicalPlanType { analyzer_name }) => {
124                    PlanType::AnalyzedLogicalPlan {
125                        analyzer_name:analyzer_name.clone()
126                    }
127                }
128                FinalAnalyzedLogicalPlan(_) => PlanType::FinalAnalyzedLogicalPlan,
129                OptimizedLogicalPlan(OptimizedLogicalPlanType { optimizer_name }) => {
130                    PlanType::OptimizedLogicalPlan {
131                        optimizer_name: optimizer_name.clone(),
132                    }
133                }
134                FinalLogicalPlan(_) => PlanType::FinalLogicalPlan,
135                InitialPhysicalPlan(_) => PlanType::InitialPhysicalPlan,
136                InitialPhysicalPlanWithStats(_) => PlanType::InitialPhysicalPlanWithStats,
137                InitialPhysicalPlanWithSchema(_) => PlanType::InitialPhysicalPlanWithSchema,
138                OptimizedPhysicalPlan(OptimizedPhysicalPlanType { optimizer_name }) => {
139                    PlanType::OptimizedPhysicalPlan {
140                        optimizer_name: optimizer_name.clone(),
141                    }
142                }
143                FinalPhysicalPlan(_) => PlanType::FinalPhysicalPlan,
144                FinalPhysicalPlanWithStats(_) => PlanType::FinalPhysicalPlanWithStats,
145                FinalPhysicalPlanWithSchema(_) => PlanType::FinalPhysicalPlanWithSchema,
146                PhysicalPlanError(_) => PlanType::PhysicalPlanError,
147            },
148            plan: Arc::new(stringified_plan.plan.clone()),
149        }
150    }
151}
152
153impl TryFrom<protobuf::WindowFrame> for WindowFrame {
154    type Error = Error;
155
156    fn try_from(window: protobuf::WindowFrame) -> Result<Self, Self::Error> {
157        let units = protobuf::WindowFrameUnits::try_from(window.window_frame_units)
158            .map_err(|_| Error::unknown("WindowFrameUnits", window.window_frame_units))?
159            .into();
160        let start_bound = window.start_bound.required("start_bound")?;
161        let end_bound = window
162            .end_bound
163            .map(|end_bound| match end_bound {
164                protobuf::window_frame::EndBound::Bound(end_bound) => {
165                    end_bound.try_into()
166                }
167            })
168            .transpose()?
169            .unwrap_or(WindowFrameBound::CurrentRow);
170        Ok(WindowFrame::new_bounds(units, start_bound, end_bound))
171    }
172}
173
174impl TryFrom<protobuf::WindowFrameBound> for WindowFrameBound {
175    type Error = Error;
176
177    fn try_from(bound: protobuf::WindowFrameBound) -> Result<Self, Self::Error> {
178        let bound_type =
179            protobuf::WindowFrameBoundType::try_from(bound.window_frame_bound_type)
180                .map_err(|_| {
181                    Error::unknown("WindowFrameBoundType", bound.window_frame_bound_type)
182                })?;
183        match bound_type {
184            protobuf::WindowFrameBoundType::CurrentRow => Ok(Self::CurrentRow),
185            protobuf::WindowFrameBoundType::Preceding => match bound.bound_value {
186                Some(x) => Ok(Self::Preceding(ScalarValue::try_from(&x)?)),
187                None => Ok(Self::Preceding(ScalarValue::UInt64(None))),
188            },
189            protobuf::WindowFrameBoundType::Following => match bound.bound_value {
190                Some(x) => Ok(Self::Following(ScalarValue::try_from(&x)?)),
191                None => Ok(Self::Following(ScalarValue::UInt64(None))),
192            },
193        }
194    }
195}
196
197impl From<protobuf::JoinType> for JoinType {
198    fn from(t: protobuf::JoinType) -> Self {
199        match t {
200            protobuf::JoinType::Inner => JoinType::Inner,
201            protobuf::JoinType::Left => JoinType::Left,
202            protobuf::JoinType::Right => JoinType::Right,
203            protobuf::JoinType::Full => JoinType::Full,
204            protobuf::JoinType::Leftsemi => JoinType::LeftSemi,
205            protobuf::JoinType::Rightsemi => JoinType::RightSemi,
206            protobuf::JoinType::Leftanti => JoinType::LeftAnti,
207            protobuf::JoinType::Rightanti => JoinType::RightAnti,
208            protobuf::JoinType::Leftmark => JoinType::LeftMark,
209            protobuf::JoinType::Rightmark => JoinType::RightMark,
210        }
211    }
212}
213
214impl From<protobuf::JoinConstraint> for JoinConstraint {
215    fn from(t: protobuf::JoinConstraint) -> Self {
216        match t {
217            protobuf::JoinConstraint::On => JoinConstraint::On,
218            protobuf::JoinConstraint::Using => JoinConstraint::Using,
219        }
220    }
221}
222
223impl From<protobuf::NullEquality> for NullEquality {
224    fn from(t: protobuf::NullEquality) -> Self {
225        match t {
226            protobuf::NullEquality::NullEqualsNothing => NullEquality::NullEqualsNothing,
227            protobuf::NullEquality::NullEqualsNull => NullEquality::NullEqualsNull,
228        }
229    }
230}
231
232impl From<protobuf::dml_node::Type> for WriteOp {
233    fn from(t: protobuf::dml_node::Type) -> Self {
234        match t {
235            protobuf::dml_node::Type::Update => WriteOp::Update,
236            protobuf::dml_node::Type::Delete => WriteOp::Delete,
237            protobuf::dml_node::Type::InsertAppend => WriteOp::Insert(InsertOp::Append),
238            protobuf::dml_node::Type::InsertOverwrite => {
239                WriteOp::Insert(InsertOp::Overwrite)
240            }
241            protobuf::dml_node::Type::InsertReplace => WriteOp::Insert(InsertOp::Replace),
242            protobuf::dml_node::Type::Ctas => WriteOp::Ctas,
243        }
244    }
245}
246
247impl From<protobuf::NullTreatment> for NullTreatment {
248    fn from(t: protobuf::NullTreatment) -> Self {
249        match t {
250            protobuf::NullTreatment::RespectNulls => NullTreatment::RespectNulls,
251            protobuf::NullTreatment::IgnoreNulls => NullTreatment::IgnoreNulls,
252        }
253    }
254}
255
256pub fn parse_expr(
257    proto: &protobuf::LogicalExprNode,
258    registry: &dyn FunctionRegistry,
259    codec: &dyn LogicalExtensionCodec,
260) -> Result<Expr, Error> {
261    use protobuf::{logical_expr_node::ExprType, window_expr_node};
262
263    let expr_type = proto
264        .expr_type
265        .as_ref()
266        .ok_or_else(|| Error::required("expr_type"))?;
267
268    match expr_type {
269        ExprType::BinaryExpr(binary_expr) => {
270            let op = from_proto_binary_op(&binary_expr.op)?;
271            let operands = parse_exprs(&binary_expr.operands, registry, codec)?;
272
273            if operands.len() < 2 {
274                return Err(proto_error(
275                    "A binary expression must always have at least 2 operands",
276                ));
277            }
278
279            // Reduce the linearized operands (ordered by left innermost to right
280            // outermost) into a single expression tree.
281            Ok(operands
282                .into_iter()
283                .reduce(|left, right| {
284                    Expr::BinaryExpr(BinaryExpr::new(Box::new(left), op, Box::new(right)))
285                })
286                .expect("Binary expression could not be reduced to a single expression."))
287        }
288        ExprType::Column(column) => Ok(Expr::Column(column.into())),
289        ExprType::Literal(literal) => {
290            let scalar_value: ScalarValue = literal.try_into()?;
291            Ok(Expr::Literal(scalar_value, None))
292        }
293        ExprType::WindowExpr(expr) => {
294            let window_function = expr
295                .window_function
296                .as_ref()
297                .ok_or_else(|| Error::required("window_function"))?;
298            let partition_by = parse_exprs(&expr.partition_by, registry, codec)?;
299            let mut order_by = parse_sorts(&expr.order_by, registry, codec)?;
300            let window_frame = expr
301                .window_frame
302                .as_ref()
303                .map::<Result<WindowFrame, _>, _>(|window_frame| {
304                    let window_frame: WindowFrame = window_frame.clone().try_into()?;
305                    window_frame
306                        .regularize_order_bys(&mut order_by)
307                        .map(|_| window_frame)
308                })
309                .transpose()?
310                .ok_or_else(|| {
311                    exec_datafusion_err!("missing window frame during deserialization")
312                })?;
313
314            let null_treatment = match expr.null_treatment {
315                Some(null_treatment) => {
316                    let null_treatment  =  protobuf::NullTreatment::try_from(null_treatment)
317                    .map_err(|_| {
318                        proto_error(format!(
319                            "Received a WindowExprNode message with unknown NullTreatment {null_treatment}",
320                        ))
321                    })?;
322                    Some(NullTreatment::from(null_treatment))
323                }
324                None => None,
325            };
326
327            let agg_fn = match window_function {
328                window_expr_node::WindowFunction::Udaf(udaf_name) => {
329                    let udaf_function = match &expr.fun_definition {
330                        Some(buf) => codec.try_decode_udaf(udaf_name, buf)?,
331                        None => registry
332                            .udaf(udaf_name)
333                            .or_else(|_| codec.try_decode_udaf(udaf_name, &[]))?,
334                    };
335                    expr::WindowFunctionDefinition::AggregateUDF(udaf_function)
336                }
337                window_expr_node::WindowFunction::Udwf(udwf_name) => {
338                    let udwf_function = match &expr.fun_definition {
339                        Some(buf) => codec.try_decode_udwf(udwf_name, buf)?,
340                        None => registry
341                            .udwf(udwf_name)
342                            .or_else(|_| codec.try_decode_udwf(udwf_name, &[]))?,
343                    };
344                    expr::WindowFunctionDefinition::WindowUDF(udwf_function)
345                }
346            };
347
348            let args = parse_exprs(&expr.exprs, registry, codec)?;
349            let mut builder = Expr::from(WindowFunction::new(agg_fn, args))
350                .partition_by(partition_by)
351                .order_by(order_by)
352                .window_frame(window_frame)
353                .null_treatment(null_treatment);
354
355            if expr.distinct {
356                builder = builder.distinct();
357            };
358
359            if let Some(filter) =
360                parse_optional_expr(expr.filter.as_deref(), registry, codec)?
361            {
362                builder = builder.filter(filter);
363            }
364
365            builder.build().map_err(Error::DataFusionError)
366        }
367        ExprType::Alias(alias) => Ok(Expr::Alias(Alias::new(
368            parse_required_expr(alias.expr.as_deref(), registry, "expr", codec)?,
369            alias
370                .relation
371                .first()
372                .map(|r| TableReference::try_from(r.clone()))
373                .transpose()?,
374            alias.alias.clone(),
375        ))),
376        ExprType::IsNullExpr(is_null) => Ok(Expr::IsNull(Box::new(parse_required_expr(
377            is_null.expr.as_deref(),
378            registry,
379            "expr",
380            codec,
381        )?))),
382        ExprType::IsNotNullExpr(is_not_null) => Ok(Expr::IsNotNull(Box::new(
383            parse_required_expr(is_not_null.expr.as_deref(), registry, "expr", codec)?,
384        ))),
385        ExprType::NotExpr(not) => Ok(Expr::Not(Box::new(parse_required_expr(
386            not.expr.as_deref(),
387            registry,
388            "expr",
389            codec,
390        )?))),
391        ExprType::IsTrue(msg) => Ok(Expr::IsTrue(Box::new(parse_required_expr(
392            msg.expr.as_deref(),
393            registry,
394            "expr",
395            codec,
396        )?))),
397        ExprType::IsFalse(msg) => Ok(Expr::IsFalse(Box::new(parse_required_expr(
398            msg.expr.as_deref(),
399            registry,
400            "expr",
401            codec,
402        )?))),
403        ExprType::IsUnknown(msg) => Ok(Expr::IsUnknown(Box::new(parse_required_expr(
404            msg.expr.as_deref(),
405            registry,
406            "expr",
407            codec,
408        )?))),
409        ExprType::IsNotTrue(msg) => Ok(Expr::IsNotTrue(Box::new(parse_required_expr(
410            msg.expr.as_deref(),
411            registry,
412            "expr",
413            codec,
414        )?))),
415        ExprType::IsNotFalse(msg) => Ok(Expr::IsNotFalse(Box::new(parse_required_expr(
416            msg.expr.as_deref(),
417            registry,
418            "expr",
419            codec,
420        )?))),
421        ExprType::IsNotUnknown(msg) => Ok(Expr::IsNotUnknown(Box::new(
422            parse_required_expr(msg.expr.as_deref(), registry, "expr", codec)?,
423        ))),
424        ExprType::Between(between) => Ok(Expr::Between(Between::new(
425            Box::new(parse_required_expr(
426                between.expr.as_deref(),
427                registry,
428                "expr",
429                codec,
430            )?),
431            between.negated,
432            Box::new(parse_required_expr(
433                between.low.as_deref(),
434                registry,
435                "expr",
436                codec,
437            )?),
438            Box::new(parse_required_expr(
439                between.high.as_deref(),
440                registry,
441                "expr",
442                codec,
443            )?),
444        ))),
445        ExprType::Like(like) => Ok(Expr::Like(Like::new(
446            like.negated,
447            Box::new(parse_required_expr(
448                like.expr.as_deref(),
449                registry,
450                "expr",
451                codec,
452            )?),
453            Box::new(parse_required_expr(
454                like.pattern.as_deref(),
455                registry,
456                "pattern",
457                codec,
458            )?),
459            parse_escape_char(&like.escape_char)?,
460            false,
461        ))),
462        ExprType::Ilike(like) => Ok(Expr::Like(Like::new(
463            like.negated,
464            Box::new(parse_required_expr(
465                like.expr.as_deref(),
466                registry,
467                "expr",
468                codec,
469            )?),
470            Box::new(parse_required_expr(
471                like.pattern.as_deref(),
472                registry,
473                "pattern",
474                codec,
475            )?),
476            parse_escape_char(&like.escape_char)?,
477            true,
478        ))),
479        ExprType::SimilarTo(like) => Ok(Expr::SimilarTo(Like::new(
480            like.negated,
481            Box::new(parse_required_expr(
482                like.expr.as_deref(),
483                registry,
484                "expr",
485                codec,
486            )?),
487            Box::new(parse_required_expr(
488                like.pattern.as_deref(),
489                registry,
490                "pattern",
491                codec,
492            )?),
493            parse_escape_char(&like.escape_char)?,
494            false,
495        ))),
496        ExprType::Case(case) => {
497            let when_then_expr = case
498                .when_then_expr
499                .iter()
500                .map(|e| {
501                    let when_expr = parse_required_expr(
502                        e.when_expr.as_ref(),
503                        registry,
504                        "when_expr",
505                        codec,
506                    )?;
507                    let then_expr = parse_required_expr(
508                        e.then_expr.as_ref(),
509                        registry,
510                        "then_expr",
511                        codec,
512                    )?;
513                    Ok((Box::new(when_expr), Box::new(then_expr)))
514                })
515                .collect::<Result<Vec<(Box<Expr>, Box<Expr>)>, Error>>()?;
516            Ok(Expr::Case(Case::new(
517                parse_optional_expr(case.expr.as_deref(), registry, codec)?.map(Box::new),
518                when_then_expr,
519                parse_optional_expr(case.else_expr.as_deref(), registry, codec)?
520                    .map(Box::new),
521            )))
522        }
523        ExprType::Cast(cast) => {
524            let expr = Box::new(parse_required_expr(
525                cast.expr.as_deref(),
526                registry,
527                "expr",
528                codec,
529            )?);
530            let data_type = cast.arrow_type.as_ref().required("arrow_type")?;
531            Ok(Expr::Cast(Cast::new(expr, data_type)))
532        }
533        ExprType::TryCast(cast) => {
534            let expr = Box::new(parse_required_expr(
535                cast.expr.as_deref(),
536                registry,
537                "expr",
538                codec,
539            )?);
540            let data_type = cast.arrow_type.as_ref().required("arrow_type")?;
541            Ok(Expr::TryCast(TryCast::new(expr, data_type)))
542        }
543        ExprType::Negative(negative) => Ok(Expr::Negative(Box::new(
544            parse_required_expr(negative.expr.as_deref(), registry, "expr", codec)?,
545        ))),
546        ExprType::Unnest(unnest) => {
547            let mut exprs = parse_exprs(&unnest.exprs, registry, codec)?;
548            if exprs.len() != 1 {
549                return Err(proto_error("Unnest must have exactly one expression"));
550            }
551            Ok(Expr::Unnest(Unnest::new(exprs.swap_remove(0))))
552        }
553        ExprType::InList(in_list) => Ok(Expr::InList(InList::new(
554            Box::new(parse_required_expr(
555                in_list.expr.as_deref(),
556                registry,
557                "expr",
558                codec,
559            )?),
560            parse_exprs(&in_list.list, registry, codec)?,
561            in_list.negated,
562        ))),
563        ExprType::Wildcard(protobuf::Wildcard { qualifier }) => {
564            let qualifier = qualifier.to_owned().map(|x| x.try_into()).transpose()?;
565            #[expect(deprecated)]
566            Ok(Expr::Wildcard {
567                qualifier,
568                options: Box::new(WildcardOptions::default()),
569            })
570        }
571        ExprType::ScalarUdfExpr(protobuf::ScalarUdfExprNode {
572            fun_name,
573            args,
574            fun_definition,
575        }) => {
576            let scalar_fn = match fun_definition {
577                Some(buf) => codec.try_decode_udf(fun_name, buf)?,
578                None => registry
579                    .udf(fun_name.as_str())
580                    .or_else(|_| codec.try_decode_udf(fun_name, &[]))?,
581            };
582            Ok(Expr::ScalarFunction(expr::ScalarFunction::new_udf(
583                scalar_fn,
584                parse_exprs(args, registry, codec)?,
585            )))
586        }
587        ExprType::AggregateUdfExpr(pb) => {
588            let agg_fn = match &pb.fun_definition {
589                Some(buf) => codec.try_decode_udaf(&pb.fun_name, buf)?,
590                None => registry
591                    .udaf(&pb.fun_name)
592                    .or_else(|_| codec.try_decode_udaf(&pb.fun_name, &[]))?,
593            };
594            let null_treatment = match pb.null_treatment {
595                Some(null_treatment) => {
596                    let null_treatment  =  protobuf::NullTreatment::try_from(null_treatment)
597                    .map_err(|_| {
598                        proto_error(format!(
599                            "Received an AggregateUdfExprNode message with unknown NullTreatment {null_treatment}",
600                        ))
601                    })?;
602                    Some(NullTreatment::from(null_treatment))
603                }
604                None => None,
605            };
606
607            Ok(Expr::AggregateFunction(expr::AggregateFunction::new_udf(
608                agg_fn,
609                parse_exprs(&pb.args, registry, codec)?,
610                pb.distinct,
611                parse_optional_expr(pb.filter.as_deref(), registry, codec)?.map(Box::new),
612                parse_sorts(&pb.order_by, registry, codec)?,
613                null_treatment,
614            )))
615        }
616
617        ExprType::GroupingSet(GroupingSetNode { expr }) => {
618            Ok(Expr::GroupingSet(GroupingSets(
619                expr.iter()
620                    .map(|expr_list| parse_exprs(&expr_list.expr, registry, codec))
621                    .collect::<Result<Vec<_>, Error>>()?,
622            )))
623        }
624        ExprType::Cube(CubeNode { expr }) => Ok(Expr::GroupingSet(GroupingSet::Cube(
625            parse_exprs(expr, registry, codec)?,
626        ))),
627        ExprType::Rollup(RollupNode { expr }) => Ok(Expr::GroupingSet(
628            GroupingSet::Rollup(parse_exprs(expr, registry, codec)?),
629        )),
630        ExprType::Placeholder(PlaceholderNode {
631            id,
632            data_type,
633            nullable,
634            metadata,
635        }) => match data_type {
636            None => Ok(Expr::Placeholder(Placeholder::new_with_field(
637                id.clone(),
638                None,
639            ))),
640            Some(data_type) => {
641                let field =
642                    Field::new("", data_type.try_into()?, nullable.unwrap_or(true))
643                        .with_metadata(metadata.clone());
644                Ok(Expr::Placeholder(Placeholder::new_with_field(
645                    id.clone(),
646                    Some(field.into()),
647                )))
648            }
649        },
650    }
651}
652
653/// Parse a vector of `protobuf::LogicalExprNode`s.
654pub fn parse_exprs<'a, I>(
655    protos: I,
656    registry: &dyn FunctionRegistry,
657    codec: &dyn LogicalExtensionCodec,
658) -> Result<Vec<Expr>, Error>
659where
660    I: IntoIterator<Item = &'a protobuf::LogicalExprNode>,
661{
662    let res = protos
663        .into_iter()
664        .map(|elem| {
665            parse_expr(elem, registry, codec).map_err(|e| plan_datafusion_err!("{}", e))
666        })
667        .collect::<Result<Vec<_>>>()?;
668    Ok(res)
669}
670
671pub fn parse_sorts<'a, I>(
672    protos: I,
673    registry: &dyn FunctionRegistry,
674    codec: &dyn LogicalExtensionCodec,
675) -> Result<Vec<Sort>, Error>
676where
677    I: IntoIterator<Item = &'a protobuf::SortExprNode>,
678{
679    protos
680        .into_iter()
681        .map(|sort| parse_sort(sort, registry, codec))
682        .collect::<Result<Vec<Sort>, Error>>()
683}
684
685pub fn parse_sort(
686    sort: &protobuf::SortExprNode,
687    registry: &dyn FunctionRegistry,
688    codec: &dyn LogicalExtensionCodec,
689) -> Result<Sort, Error> {
690    Ok(Sort::new(
691        parse_required_expr(sort.expr.as_ref(), registry, "expr", codec)?,
692        sort.asc,
693        sort.nulls_first,
694    ))
695}
696
697/// Parse an optional escape_char for Like, ILike, SimilarTo
698fn parse_escape_char(s: &str) -> Result<Option<char>> {
699    match s.len() {
700        0 => Ok(None),
701        1 => Ok(s.chars().next()),
702        _ => internal_err!("Invalid length for escape char"),
703    }
704}
705
706pub fn from_proto_binary_op(op: &str) -> Result<Operator, Error> {
707    match op {
708        "And" => Ok(Operator::And),
709        "Or" => Ok(Operator::Or),
710        "Eq" => Ok(Operator::Eq),
711        "NotEq" => Ok(Operator::NotEq),
712        "LtEq" => Ok(Operator::LtEq),
713        "Lt" => Ok(Operator::Lt),
714        "Gt" => Ok(Operator::Gt),
715        "GtEq" => Ok(Operator::GtEq),
716        "Plus" => Ok(Operator::Plus),
717        "Minus" => Ok(Operator::Minus),
718        "Multiply" => Ok(Operator::Multiply),
719        "Divide" => Ok(Operator::Divide),
720        "Modulo" => Ok(Operator::Modulo),
721        "IsDistinctFrom" => Ok(Operator::IsDistinctFrom),
722        "IsNotDistinctFrom" => Ok(Operator::IsNotDistinctFrom),
723        "BitwiseAnd" => Ok(Operator::BitwiseAnd),
724        "BitwiseOr" => Ok(Operator::BitwiseOr),
725        "BitwiseXor" => Ok(Operator::BitwiseXor),
726        "BitwiseShiftLeft" => Ok(Operator::BitwiseShiftLeft),
727        "BitwiseShiftRight" => Ok(Operator::BitwiseShiftRight),
728        "RegexIMatch" => Ok(Operator::RegexIMatch),
729        "RegexMatch" => Ok(Operator::RegexMatch),
730        "RegexNotIMatch" => Ok(Operator::RegexNotIMatch),
731        "RegexNotMatch" => Ok(Operator::RegexNotMatch),
732        "StringConcat" => Ok(Operator::StringConcat),
733        "AtArrow" => Ok(Operator::AtArrow),
734        "ArrowAt" => Ok(Operator::ArrowAt),
735        other => Err(proto_error(format!(
736            "Unsupported binary operator '{other:?}'"
737        ))),
738    }
739}
740
741fn parse_optional_expr(
742    p: Option<&protobuf::LogicalExprNode>,
743    registry: &dyn FunctionRegistry,
744    codec: &dyn LogicalExtensionCodec,
745) -> Result<Option<Expr>, Error> {
746    match p {
747        Some(expr) => parse_expr(expr, registry, codec).map(Some),
748        None => Ok(None),
749    }
750}
751
752fn parse_required_expr(
753    p: Option<&protobuf::LogicalExprNode>,
754    registry: &dyn FunctionRegistry,
755    field: impl Into<String>,
756    codec: &dyn LogicalExtensionCodec,
757) -> Result<Expr, Error> {
758    match p {
759        Some(expr) => parse_expr(expr, registry, codec),
760        None => Err(Error::required(field)),
761    }
762}
763
764fn proto_error<S: Into<String>>(message: S) -> Error {
765    Error::General(message.into())
766}